You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by gatorsmile <gi...@git.apache.org> on 2018/01/06 08:37:56 UTC

[GitHub] spark pull request #20171: Support vectorized udf

GitHub user gatorsmile opened a pull request:

    https://github.com/apache/spark/pull/20171

    Support vectorized udf

    ## What changes were proposed in this pull request?
    
    (Please fill in changes proposed in this fix)
    
    ## How was this patch tested?
    
    (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
    (If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gatorsmile/spark supportVectorizedUDF

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20171.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20171
    
----
commit 5e0c8e1f08b4bb0716b8f21a8393acea361981dc
Author: gatorsmile <ga...@...>
Date:   2018-01-06T08:06:36Z

    fix

commit f41e74e9ba87900dbd66cec12fbb579755d4290b
Author: gatorsmile <ga...@...>
Date:   2018-01-06T08:20:41Z

    Merge remote-tracking branch 'upstream/master' into supportVectorizedUDF

commit 3983bcbbd7a1279b899e72d07822afa5d4ef7749
Author: gatorsmile <ga...@...>
Date:   2018-01-06T08:36:37Z

    rename

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #86121 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86121/testReport)** for PR 20171 at commit [`99fc0b2`](https://github.com/apache/spark/commit/99fc0b2eb36efedc91c40c8eb377bd2a6d0e23a2).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161532260
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    I am fine about the naming convention. Do we have a style recommendation for it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Merged to master and branch-2.3.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161649497
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    I am not saying we should have the same message. I am trying to persuade you to throw an error in this case. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r160318804
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3616,6 +3616,34 @@ def test_vectorized_udf_basic(self):
                             bool_f(col('bool')))
             self.assertEquals(df.collect(), res.collect())
     
    +    def test_register_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        twoArgsPandasUDF = pandas_udf(lambda x: len(x), IntegerType())
    --- End diff --
    
    The name is wrong: there is only one arg.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161649821
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    I can revert it back if you want to take it. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    I think that's because we expect Pandas's `Series` in the arguments. Correct usage will be something like `x.str.len() + y`. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #85748 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85748/testReport)** for PR 20171 at commit [`fe8dcbe`](https://github.com/apache/spark/commit/fe8dcbe47c7800e771420b416f6e5352c745e85c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161532662
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    cc @cloud-fan @taku-k 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161657719
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    Optional value is okay but I mean it's better to throw an exception. I am not seeing the advantage of supporting this optionally. @ueshin do you think it's better to support this case?
    
    I am less sure of the point of supporting `returnType` with UDF when we are disallowed to change. It causes confusion like we allow it but then if the type is different, we will issue an exception.
    
    Is it more important to allow this corner case than we make the APIs clear as if we have `def register(name, f) # for UDF` alone? We can keep clear about disallowing `returnType` at register time too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161385554
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3616,6 +3622,21 @@ def test_vectorized_udf_basic(self):
                             bool_f(col('bool')))
             self.assertEquals(df.collect(), res.collect())
     
    +    def test_register_nondeterministic_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        import random
    +        randomPandasUDF = pandas_udf(
    +            lambda x: random.randint(6, 6) + x, IntegerType()).asNondeterministic()
    +        self.assertEqual(randomPandasUDF.deterministic, False)
    +        self.assertEqual(randomPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        nondeterministicPandasUDF = self.spark.catalog.registerFunction(
    --- End diff --
    
    `nondeterministicPandasUDF` -> `nondeterministic_pandas_udf`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #86166 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86166/testReport)** for PR 20171 at commit [`d73ab3b`](https://github.com/apache/spark/commit/d73ab3b5af06d7b842eee5a2403242ef4f32c3a1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r160086985
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -265,12 +267,23 @@ def registerFunction(self, name, f, returnType=StringType()):
             [Row(random_udf()=u'82')]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
             [Row(random_udf()=u'62')]
    +
    +        >>> import random
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import pandas_udf
    +        >>> random_pandas_udf = pandas_udf(
    +        ...     lambda x: random.randint(0, 100) + x, IntegerType())
    +        ...     .asNondeterministic()  # doctest: +SKIP
    +        >>> _ = spark.catalog.registerFunction(
    +        ...     "random_pandas_udf", random_pandas_udf, IntegerType())  # doctest: +SKIP
    +        >>> spark.sql("SELECT random_pandas_udf(2)").collect()  # doctest: +SKIP
    +        [Row(random_pandas_udf(2)=84)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
                 udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    +                                      evalType=f.evalType,
    --- End diff --
    
    > when it's not a PythonEvalType.SQL_BATCHED_UDF
    ->
    > when it's neither a `PythonEvalType.SQL_BATCHED_UDF` nor `PythonEvalType.SQL_PANDAS_SCALAR_UDF`, right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r160318854
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3616,6 +3616,34 @@ def test_vectorized_udf_basic(self):
                             bool_f(col('bool')))
             self.assertEquals(df.collect(), res.collect())
     
    +    def test_register_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        twoArgsPandasUDF = pandas_udf(lambda x: len(x), IntegerType())
    --- End diff --
    
    `x.str.len()` instead of `len(x)`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85748/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Let me close this PR and open a new PR to introduce a new function `registerUDF`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161639815
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    I will do it in this PR.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161654136
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    Is it common in our current PySpark impl?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161638411
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    The current change keeps `sqlContext`. We just need to add a test case for `sqlContext.registerFunction`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161402514
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,56 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    I did not get your point. If we just check `returnType != f.returnType`, it will fail, because `None != f.returnType` is always true.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    @ueshin @cloud-fan @HyukjinKwon @icexelloss 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161603693
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    Doctest is for testing purpose too. I intended to do this in a separate PR and that's why I suggest to leave it as was.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85764/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r160441123
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3616,6 +3616,34 @@ def test_vectorized_udf_basic(self):
                             bool_f(col('bool')))
             self.assertEquals(df.collect(), res.collect())
     
    +    def test_register_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        twoArgsPandasUDF = pandas_udf(lambda x: len(x), IntegerType())
    +        self.assertEqual(twoArgsPandasUDF.deterministic, True)
    +        self.assertEqual(twoArgsPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        newPandasUDF = self.spark.catalog.registerFunction(
    +            "twoArgsPandasUDF", twoArgsPandasUDF, IntegerType())
    +        self.assertEqual(newPandasUDF.deterministic, True)
    +        self.assertEqual(newPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        [row] = self.spark.sql("SELECT twoArgsPandasUDF('test')").collect()
    +        self.assertEqual(row[0], 4)
    +
    +    def test_register_nondeterministic_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        import random
    +        randomPandasUDF = pandas_udf(
    +            lambda x: random.randint(6, 6) + x, StringType()).asNondeterministic()
    --- End diff --
    
    Sounds good to me too. Another alternative is to have a registerUDF(name, udf) instead of having registerFunction work with both lambda function and UDFs. This way we don't need to have the confusing situation with `returnType` arg.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161654514
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    I might miss something but I think it's okay to take `returnType` parameter optionally if the value is the same as the udf's.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161647061
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    The function already has the parameter `returnType`. Thus,  `def register(name, f)  # for UDF` is not right.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #85764 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85764/testReport)** for PR 20171 at commit [`3c08f3d`](https://github.com/apache/spark/commit/3c08f3d6b7ec58735260de687bb74b104e6f7009).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r160319379
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3616,6 +3616,34 @@ def test_vectorized_udf_basic(self):
                             bool_f(col('bool')))
             self.assertEquals(df.collect(), res.collect())
     
    +    def test_register_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        twoArgsPandasUDF = pandas_udf(lambda x: len(x), IntegerType())
    +        self.assertEqual(twoArgsPandasUDF.deterministic, True)
    +        self.assertEqual(twoArgsPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        newPandasUDF = self.spark.catalog.registerFunction(
    +            "twoArgsPandasUDF", twoArgsPandasUDF, IntegerType())
    +        self.assertEqual(newPandasUDF.deterministic, True)
    +        self.assertEqual(newPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        [row] = self.spark.sql("SELECT twoArgsPandasUDF('test')").collect()
    +        self.assertEqual(row[0], 4)
    +
    +    def test_register_nondeterministic_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        import random
    +        randomPandasUDF = pandas_udf(
    +            lambda x: random.randint(6, 6) + x, StringType()).asNondeterministic()
    --- End diff --
    
    I'm not quite sure about 3, I think return type is a property of the defined UDF, not a register-time stuff. So if users are registering a UDF(not python function), it's not allowed to specify the `returnType` parameter.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161414648
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    `newRandom_udf` -> `new_random_udf`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161385504
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,56 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and returnType != f.returnType:
    +                raise ValueError(
    +                    "Invalid returnType: the provided returnType (%s) is inconsistent with "
    +                    "the returnType (%s) of the provided f. When the provided f is a UDF, "
    +                    "returnType is not needed." % (returnType, f.returnType))
    +            registerUDF = UserDefinedFunction(f.func, returnType=f.returnType, name=name,
    +                                              evalType=f.evalType,
    +                                              deterministic=f.deterministic)
    +            returnUDF = f
    --- End diff --
    
    `returnUDF` -> `return_udf`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86101/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r160583088
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3616,6 +3616,34 @@ def test_vectorized_udf_basic(self):
                             bool_f(col('bool')))
             self.assertEquals(df.collect(), res.collect())
     
    +    def test_register_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        twoArgsPandasUDF = pandas_udf(lambda x: len(x), IntegerType())
    +        self.assertEqual(twoArgsPandasUDF.deterministic, True)
    +        self.assertEqual(twoArgsPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        newPandasUDF = self.spark.catalog.registerFunction(
    +            "twoArgsPandasUDF", twoArgsPandasUDF, IntegerType())
    +        self.assertEqual(newPandasUDF.deterministic, True)
    +        self.assertEqual(newPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        [row] = self.spark.sql("SELECT twoArgsPandasUDF('test')").collect()
    +        self.assertEqual(row[0], 4)
    +
    +    def test_register_nondeterministic_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        import random
    +        randomPandasUDF = pandas_udf(
    +            lambda x: random.randint(6, 6) + x, StringType()).asNondeterministic()
    --- End diff --
    
    sounds good.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86166/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161400802
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    `sqlContext` has been deprecated since 2.0. SparkSession should be the default entrance. Here, the example is just to show the way we recommend to the users.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r160310548
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3616,6 +3616,34 @@ def test_vectorized_udf_basic(self):
                             bool_f(col('bool')))
             self.assertEquals(df.collect(), res.collect())
     
    +    def test_register_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        twoArgsPandasUDF = pandas_udf(lambda x: len(x), IntegerType())
    +        self.assertEqual(twoArgsPandasUDF.deterministic, True)
    +        self.assertEqual(twoArgsPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        newPandasUDF = self.spark.catalog.registerFunction(
    +            "twoArgsPandasUDF", twoArgsPandasUDF, IntegerType())
    +        self.assertEqual(newPandasUDF.deterministic, True)
    +        self.assertEqual(newPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        [row] = self.spark.sql("SELECT twoArgsPandasUDF('test')").collect()
    +        self.assertEqual(row[0], 4)
    +
    +    def test_register_nondeterministic_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        import random
    +        randomPandasUDF = pandas_udf(
    +            lambda x: random.randint(6, 6) + x, StringType()).asNondeterministic()
    --- End diff --
    
    good question, also cc @ueshin 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86165/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161723055
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +261,55 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> new_random_udf = spark.catalog.registerFunction("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    -        >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(random_udf()=82)]
    +        >>> spark.range(1).select(new_random_udf()).collect()  # doctest: +SKIP
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if returnType is not None:
    +                raise TypeError(
    +                    "Invalid returnType: None is expected when f is a UserDefinedFunction, "
    --- End diff --
    
    Here too, I think here we should say `returnType` is disallowed to be set when `f` is a `UserDefinedFunction`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    retest this please 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r160229142
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3616,6 +3616,34 @@ def test_vectorized_udf_basic(self):
                             bool_f(col('bool')))
             self.assertEquals(df.collect(), res.collect())
     
    +    def test_register_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        twoArgsPandasUDF = pandas_udf(lambda x: len(x), IntegerType())
    +        self.assertEqual(twoArgsPandasUDF.deterministic, True)
    +        self.assertEqual(twoArgsPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        newPandasUDF = self.spark.catalog.registerFunction(
    +            "twoArgsPandasUDF", twoArgsPandasUDF, IntegerType())
    +        self.assertEqual(newPandasUDF.deterministic, True)
    +        self.assertEqual(newPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        [row] = self.spark.sql("SELECT twoArgsPandasUDF('test')").collect()
    +        self.assertEqual(row[0], 4)
    +
    +    def test_register_nondeterministic_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        import random
    +        randomPandasUDF = pandas_udf(
    +            lambda x: random.randint(6, 6) + x, StringType()).asNondeterministic()
    --- End diff --
    
    The UDF returnType doesn't match the returnType in registerFunction, what's the expected behavior in this case?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161638977
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    I mean it doesn't completely cover the concern:
    
    > `sqlContext` has been deprecated since 2.0. SparkSession should be the default entrance
    
    and this change doesn't completely replace it too. If it's meant to be separate, we should better leave this change out. What I was wondering is why this partially fixes this concern in a separate PR.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161414948
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    I mean we can simply throw an exception always if `returnType` is given (not `None`) but `f` is a udf. I thought we try to resemable an overloading for`register(name, f)`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161592401
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    --- End diff --
    
    This is to avoid generating the random hex value returned by PySpark. You can try `spark.udf.register("add_one", add_one)`
    
    With the underscore placeholder, we can remove `# doctest: +SKIP`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/20171


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #86100 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86100/testReport)** for PR 20171 at commit [`423c832`](https://github.com/apache/spark/commit/423c832b89f8bef5f4812d24c67921644f169f15).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161579882
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    --- End diff --
    
    Is there a reason to return to a underscore placeholder?  It might seem confusing to users if not required


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161711980
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -4037,6 +4082,15 @@ def test_simple(self):
             expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
             self.assertFramesEqual(expected, result)
     
    +    def test_register_group_map_udf(self):
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUP_MAP)
    +        with QuietTest(self.sc):
    +            with self.assertRaisesRegexp(ValueError, 'f must be either SQL_BATCHED_UDF or '
    +                                                     'SQL_PANDAS_SCALAR_UDF'):
    +                self.spark.catalog.registerFunction("foo_udf", foo_udf)
    --- End diff --
    
    ditto.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #85750 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85750/testReport)** for PR 20171 at commit [`3411dcc`](https://github.com/apache/spark/commit/3411dccc6ab57f3135a99d2b2b535fd5c135cbc7).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161659245
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    I see what you mean. Now I became neutral but slightly on your side.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85754/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161385550
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3616,6 +3622,21 @@ def test_vectorized_udf_basic(self):
                             bool_f(col('bool')))
             self.assertEquals(df.collect(), res.collect())
     
    +    def test_register_nondeterministic_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        import random
    +        randomPandasUDF = pandas_udf(
    --- End diff --
    
    `randomPandasUDF` -> `random_pandas_udf`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161637979
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    Since the API already has this parameter `returnType`, we should support it if possible. We need to do our best to avoid issuing the unnecessary exception. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161649275
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    What we want to support is `def register(name, f) # for UDF` so I am saying let's fix it as if we have `def register(name, f) # for UDF` which prints an error when `returnType` is given as described above.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #85759 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85759/testReport)** for PR 20171 at commit [`b801e70`](https://github.com/apache/spark/commit/b801e7094c7c730adb53a25e13363973930a0b42).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161649243
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    TypeError: register() takes exactly 2 arguments (3 given)
    ```
    
    This error message is still confusing to end users. We need to explicitly explain that `returnType` is not allowed when `f` is a UDF.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86121/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #85748 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85748/testReport)** for PR 20171 at commit [`fe8dcbe`](https://github.com/apache/spark/commit/fe8dcbe47c7800e771420b416f6e5352c745e85c).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86100/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161637065
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    I think we are trying to avoid to set `returnType` at register time. Current way appearently allows to take `returnType` when they are matched. Also, the suggestion allows resemble the overloading of the Scala version we talked. I would like to get this into 2.3 and push forward.
    
    Could you maybe eblabourate why you are not sure? Let me try to explain it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85747/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161647332
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    ```
        def register(self, name, f, returnType=None):
            return self.sqlContext.registerFunction(name, f, returnType)
    ```
    
    `returnType` is a parameter already. `def register(name, f)  # for UDF` is not true.
    
    Now, the issue we are facing is whether we should issue an exception or not when users provide the same returnType as the UDF's. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #86100 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86100/testReport)** for PR 20171 at commit [`423c832`](https://github.com/apache/spark/commit/423c832b89f8bef5f4812d24c67921644f169f15).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161635142
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    I am not sure which one is better. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r160048270
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -265,12 +267,23 @@ def registerFunction(self, name, f, returnType=StringType()):
             [Row(random_udf()=u'82')]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
             [Row(random_udf()=u'62')]
    +
    +        >>> import random
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import pandas_udf
    +        >>> random_pandas_udf = pandas_udf(
    +        ...     lambda x: random.randint(0, 100) + x, IntegerType())
    +        ...     .asNondeterministic()  # doctest: +SKIP
    +        >>> _ = spark.catalog.registerFunction(
    +        ...     "random_pandas_udf", random_pandas_udf, IntegerType())  # doctest: +SKIP
    +        >>> spark.sql("SELECT random_pandas_udf(2)").collect()  # doctest: +SKIP
    +        [Row(random_pandas_udf(2)=84)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
                 udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    +                                      evalType=f.evalType,
    --- End diff --
    
    I haven't started to review yet as it looks WIP but let's don't forget to fail fast when it's not a `PythonEvalType.SQL_BATCHED_UDF` as we discussed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161531698
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    Maybe you can submit a separate PR for it? For testing purpose, we should do it in a test suite instead of using doc. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161385427
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,56 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    Could we just simply exclude `returnType != f.returnType`? I think return type could be a string too so this case might be failed:
    
    I just double checked:
    
    ```python
    from pyspark.rdd import PythonEvalType
    from pyspark.sql.functions import pandas_udf, col, expr
    
    original_add = pandas_udf(lambda x, y: x + y, "integer")
    spark.udf.register("add", original_add, "integer")
    ```
    
    ```
    ValueError: Invalid returnType: the provided returnType (integer) is inconsistent with the returnType 
    (IntegerType) of the provided f. When the provided f is a UDF, returnType is not needed.
    ```



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161640273
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    I see. So,
    
    > Since the API already has this parameter `returnType`, we should support it if possible. 
    
    I get it. So, we want to disallow to set `returnType` at register time but also leave `returnType` support permissively as possible as we can. I think failing fast (it's not in runtime) is fine if it give a clear message.
    
    Consider this, what we want is both APIs:
    
    ```
    def register(name, f)  # for UDF
    
    def register(name, f, returnType)  # for Python native functions
    ```
    
    I think we should only support both cases in a better API design.
    
    What I am worried is JIRAs and questions from users that why
    
    ```
    def register(name, f, retunType)  # for UDF
    ```
    
    is not supported because we will apparently allow a single case in this way.
    
    Consider another case of this: if we only have this:
    
    ```
    def register(name, f)  # for UDF
    ```
    
    calling like this:
    
    ```
    register("a", lambda x: x, 1)
    ```
    
    throw an `TypeError` as below:
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    TypeError: register() takes exactly 2 arguments (3 given)
    ```
    
    If we should allow the same `returnType` later, we can simply allow it because this case is less stricter.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #85764 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85764/testReport)** for PR 20171 at commit [`3c08f3d`](https://github.com/apache/spark/commit/3c08f3d6b7ec58735260de687bb74b104e6f7009).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161385502
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,56 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and returnType != f.returnType:
    +                raise ValueError(
    +                    "Invalid returnType: the provided returnType (%s) is inconsistent with "
    +                    "the returnType (%s) of the provided f. When the provided f is a UDF, "
    +                    "returnType is not needed." % (returnType, f.returnType))
    +            registerUDF = UserDefinedFunction(f.func, returnType=f.returnType, name=name,
    --- End diff --
    
    `registerUDF` -> `register_udf`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161385602
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -4037,6 +4075,21 @@ def test_simple(self):
             expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
             self.assertFramesEqual(expected, result)
     
    +    def test_register_group_map_udf(self):
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        foo_udf = pandas_udf(
    +            lambda pdf: pdf.assign(v1=pdf.id * 1.0),
    +            StructType(
    +                [StructField('id', LongType()),
    +                 StructField('v1', DoubleType())]),
    +            PandasUDFType.GROUP_MAP
    +        )
    --- End diff --
    
    We could simplify this to 
    
    ```python
    foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUP_MAP)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #86166 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86166/testReport)** for PR 20171 at commit [`d73ab3b`](https://github.com/apache/spark/commit/d73ab3b5af06d7b842eee5a2403242ef4f32c3a1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161710081
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -174,18 +174,23 @@ def range(self, start, end=None, step=1, numPartitions=None):
     
         @ignore_unicode_prefix
         @since(1.2)
    -    def registerFunction(self, name, f, returnType=StringType()):
    +    def registerFunction(self, name, f, returnType=None):
             """Registers a Python function (including lambda function) or a :class:`UserDefinedFunction`
    -        as a UDF. The registered UDF can be used in SQL statement.
    +        as a UDF. The registered UDF can be used in SQL statements.
     
    -        In addition to a name and the function itself, the return type can be optionally specified.
    -        When the return type is not given it default to a string and conversion will automatically
    -        be done.  For any other return type, the produced object must match the specified type.
    +        :func:`spark.udf.register` is an alias for :func:`sqlContext.registerFunction`.
    --- End diff --
    
    :func:`sqlContext.registerFunction` is an alias for :func:`spark.udf.register`. ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Will try to handle with doc and minor stuff soon within few days. Seems it might be a bit more tricky then I thought.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161414603
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    In that way, we should replace `sqlContext` to `spark`. It's for testing purpose too as these are actually ran. Also, we should leave a note that it's an alias for `udf.register` too with a warn from `warning` for an IDE to detect deprecated methods and for users to see the warning. If we will just have an exact doc, we can simply reassign `__doc__` as suggested by @ueshin and @icexelloss. Simplest way is just to leave as was.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161415247
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    I know it's a bit confusing. It's because we started to have the same names in the API. Similar things also apply to R. We follow PEP 8 with few exceptions. It should be with underscore in general if possible. There's an example to refer , `threading.py` in Python. It also happened to have the similar case with us.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #85747 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85747/testReport)** for PR 20171 at commit [`3983bcb`](https://github.com/apache/spark/commit/3983bcbbd7a1279b899e72d07822afa5d4ef7749).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161635087
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    http://spark.apache.org/contributing.html I just checked it. It is already documented.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #86165 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86165/testReport)** for PR 20171 at commit [`47bce1e`](https://github.com/apache/spark/commit/47bce1efd8b86334f90e38a137d46ccef0085245).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161643600
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    Because you can only set the `returnType` of the `UserDefinedFunction`, I think it is confusing to let users to pass in returnType and require it to be matched with `returnType` of the `UserDefinedFunction`.
    
    I think we can throw an exception when the `returnType` is not None for `UserDefinedFunction` case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #85754 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85754/testReport)** for PR 20171 at commit [`b801e70`](https://github.com/apache/spark/commit/b801e7094c7c730adb53a25e13363973930a0b42).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    ```
    from pyspark.sql.functions import pandas_udf
    from pyspark.sql.functions import col, lit
    from pyspark.sql.types import LongType
    df = spark.range(3)
    f = pandas_udf(lambda x, y: len(x) + y, LongType())
    df.select(f(lit('text'), col('id'))).show()
    ```
    
    The result is wrong. cc @icexelloss @BryanCutler @taku-k @cloud-fan 
    ```
    +------------------+
    |<lambda>(text, id)|
    +------------------+
    |                 1|
    |                 2|
    |                 3|
    +------------------+
    ```
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85759/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85750/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #85747 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85747/testReport)** for PR 20171 at commit [`3983bcb`](https://github.com/apache/spark/commit/3983bcbbd7a1279b899e72d07822afa5d4ef7749).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161647635
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    I beg to just leave it as was ..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161400597
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,56 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and returnType != f.returnType:
    +                raise ValueError(
    +                    "Invalid returnType: the provided returnType (%s) is inconsistent with "
    +                    "the returnType (%s) of the provided f. When the provided f is a UDF, "
    +                    "returnType is not needed." % (returnType, f.returnType))
    +            registerUDF = UserDefinedFunction(f.func, returnType=f.returnType, name=name,
    --- End diff --
    
    What is the naming convention in PySpark? Three different styles are found. Pretty confusing. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161650064
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    Sure, I want to take it. Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161711905
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3975,33 +4003,50 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self):
             finally:
                 self.spark.conf.set("spark.sql.session.timeZone", orig_tz)
     
    -    def test_nondeterministic_udf(self):
    +    def test_nondeterministic_vectorized_udf(self):
             # Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations
             from pyspark.sql.functions import udf, pandas_udf, col
     
             @pandas_udf('double')
             def plus_ten(v):
                 return v + 10
    -        random_udf = self.random_udf
    +        random_udf = self.nondeterministic_vectorized_udf
     
             df = self.spark.range(10).withColumn('rand', random_udf(col('id')))
             result1 = df.withColumn('plus_ten(rand)', plus_ten(df['rand'])).toPandas()
     
             self.assertEqual(random_udf.deterministic, False)
             self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10))
     
    -    def test_nondeterministic_udf_in_aggregate(self):
    +    def test_nondeterministic_vectorized_udf_in_aggregate(self):
             from pyspark.sql.functions import pandas_udf, sum
     
             df = self.spark.range(10)
    -        random_udf = self.random_udf
    +        random_udf = self.nondeterministic_vectorized_udf
     
             with QuietTest(self.sc):
                 with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'):
                     df.groupby(df.id).agg(sum(random_udf(df.id))).collect()
                 with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'):
                     df.agg(sum(random_udf(df.id))).collect()
     
    +    def test_register_vectorized_udf_basic(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, col, expr
    +        df = self.spark.range(10).select(
    +            col('id').cast('int').alias('a'),
    +            col('id').cast('int').alias('b'))
    +        original_add = pandas_udf(lambda x, y: x + y, IntegerType())
    +        self.assertEqual(original_add.deterministic, True)
    +        self.assertEqual(original_add.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        new_add = self.spark.catalog.registerFunction("add1", original_add)
    --- End diff --
    
    `spark.udf.register` instead of `spark.catalog.registerFunction`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161604763
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    What do you mean by a style recommendation?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161631616
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    Ah, do you maybe literally mean like Scala style guide? It's basically PEP 8.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r160319860
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3616,6 +3616,34 @@ def test_vectorized_udf_basic(self):
                             bool_f(col('bool')))
             self.assertEquals(df.collect(), res.collect())
     
    +    def test_register_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        twoArgsPandasUDF = pandas_udf(lambda x: len(x), IntegerType())
    +        self.assertEqual(twoArgsPandasUDF.deterministic, True)
    +        self.assertEqual(twoArgsPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        newPandasUDF = self.spark.catalog.registerFunction(
    +            "twoArgsPandasUDF", twoArgsPandasUDF, IntegerType())
    +        self.assertEqual(newPandasUDF.deterministic, True)
    +        self.assertEqual(newPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        [row] = self.spark.sql("SELECT twoArgsPandasUDF('test')").collect()
    +        self.assertEqual(row[0], 4)
    +
    +    def test_register_nondeterministic_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        import random
    +        randomPandasUDF = pandas_udf(
    +            lambda x: random.randint(6, 6) + x, StringType()).asNondeterministic()
    --- End diff --
    
    That sounds good to me, too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r160089864
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -265,12 +267,23 @@ def registerFunction(self, name, f, returnType=StringType()):
             [Row(random_udf()=u'82')]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
             [Row(random_udf()=u'62')]
    +
    +        >>> import random
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import pandas_udf
    +        >>> random_pandas_udf = pandas_udf(
    +        ...     lambda x: random.randint(0, 100) + x, IntegerType())
    +        ...     .asNondeterministic()  # doctest: +SKIP
    +        >>> _ = spark.catalog.registerFunction(
    +        ...     "random_pandas_udf", random_pandas_udf, IntegerType())  # doctest: +SKIP
    +        >>> spark.sql("SELECT random_pandas_udf(2)").collect()  # doctest: +SKIP
    +        [Row(random_pandas_udf(2)=84)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
                 udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    +                                      evalType=f.evalType,
    --- End diff --
    
    Yup, I think that's right.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161651611
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
    --- End diff --
    
    nit: `new_random_udf`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161385548
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -380,10 +380,16 @@ def test_udf2(self):
     
         def test_udf3(self):
             twoargs = self.spark.catalog.registerFunction(
    -            "twoArgs", UserDefinedFunction(lambda x, y: len(x) + y), IntegerType())
    +            "twoArgs", UserDefinedFunction(lambda x, y: len(x) + y))
             self.assertEqual(twoargs.deterministic, True)
             [row] = self.spark.sql("SELECT twoArgs('test', 1)").collect()
    -        self.assertEqual(row[0], 5)
    +        self.assertEqual(row[0], u'5')
    +
    +    def test_udf_using_registerFunction_incompatibleTypes(self):
    --- End diff --
    
    How about `test_udf_registration_return_type_mismatch`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
GitHub user gatorsmile reopened a pull request:

    https://github.com/apache/spark/pull/20171

    [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL Statement [WIP]

    ## What changes were proposed in this pull request?
    Register Vectorized UDFs for SQL Statement. For example, 
    
    ```Python
    >>> import random
    >>> from pyspark.sql.types import IntegerType
    >>> from pyspark.sql.functions import pandas_udf
    >>> random_pandas_udf = pandas_udf(
    ...     lambda x: random.randint(0, 100) + x, IntegerType())
    ...     .asNondeterministic()  # doctest: +SKIP
    >>> _ = spark.catalog.registerFunction(
    ...     "random_pandas_udf", random_pandas_udf, IntegerType())  # doctest: +SKIP
    >>> spark.sql("SELECT random_pandas_udf(2)").collect()  # doctest: +SKIP
    [Row(random_pandas_udf(2)=84)]
    ```
    
    ## How was this patch tested?
    Added test cases  

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gatorsmile/spark supportVectorizedUDF

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20171.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20171
    
----
commit 5e0c8e1f08b4bb0716b8f21a8393acea361981dc
Author: gatorsmile <ga...@...>
Date:   2018-01-06T08:06:36Z

    fix

commit f41e74e9ba87900dbd66cec12fbb579755d4290b
Author: gatorsmile <ga...@...>
Date:   2018-01-06T08:20:41Z

    Merge remote-tracking branch 'upstream/master' into supportVectorizedUDF

commit 3983bcbbd7a1279b899e72d07822afa5d4ef7749
Author: gatorsmile <ga...@...>
Date:   2018-01-06T08:36:37Z

    rename

commit fe8dcbe47c7800e771420b416f6e5352c745e85c
Author: gatorsmile <ga...@...>
Date:   2018-01-06T09:09:44Z

    fix

commit 3411dccc6ab57f3135a99d2b2b535fd5c135cbc7
Author: gatorsmile <ga...@...>
Date:   2018-01-06T10:00:05Z

    import

commit b801e7094c7c730adb53a25e13363973930a0b42
Author: gatorsmile <ga...@...>
Date:   2018-01-06T11:41:33Z

    fix

commit 3c08f3d6b7ec58735260de687bb74b104e6f7009
Author: gatorsmile <ga...@...>
Date:   2018-01-07T00:41:22Z

    fix

commit 423c832b89f8bef5f4812d24c67921644f169f15
Author: gatorsmile <ga...@...>
Date:   2018-01-13T16:34:39Z

    fix

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #86101 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86101/testReport)** for PR 20171 at commit [`a052a2d`](https://github.com/apache/spark/commit/a052a2d56ac52d6c6d6bdbcdf9588315727c3254).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r160333599
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3616,6 +3616,34 @@ def test_vectorized_udf_basic(self):
                             bool_f(col('bool')))
             self.assertEquals(df.collect(), res.collect())
     
    +    def test_register_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        twoArgsPandasUDF = pandas_udf(lambda x: len(x), IntegerType())
    --- End diff --
    
    `twoArgsPandasUDF` -> `two_args_pandas_udf` too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161719551
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -174,18 +174,23 @@ def range(self, start, end=None, step=1, numPartitions=None):
     
         @ignore_unicode_prefix
         @since(1.2)
    -    def registerFunction(self, name, f, returnType=StringType()):
    +    def registerFunction(self, name, f, returnType=None):
             """Registers a Python function (including lambda function) or a :class:`UserDefinedFunction`
    -        as a UDF. The registered UDF can be used in SQL statement.
    +        as a UDF. The registered UDF can be used in SQL statements.
     
    -        In addition to a name and the function itself, the return type can be optionally specified.
    -        When the return type is not given it default to a string and conversion will automatically
    -        be done.  For any other return type, the produced object must match the specified type.
    +        :func:`spark.udf.register` is an alias for :func:`sqlContext.registerFunction`.
     
    -        :param name: name of the UDF
    -        :param f: a Python function, or a wrapped/native UserDefinedFunction
    -        :param returnType: a :class:`pyspark.sql.types.DataType` object
    -        :return: a wrapped :class:`UserDefinedFunction`
    +        In addition to a name and the function itself, `returnType` can be optionally specified.
    +        1) When f is a Python function, `returnType` defaults to a string. The produced object must
    +        match the specified type. 2) When f is a :class:`UserDefinedFunction`, Spark uses the return
    +        type of the given UDF as the return type of the registered UDF. The input parameter
    +        `returnType` is None by default. If given by users, the value must be None.
    --- End diff --
    
    I think we would simply say that data type is disallowed to set to `returnType` rather then `None` should be set.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #86101 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86101/testReport)** for PR 20171 at commit [`a052a2d`](https://github.com/apache/spark/commit/a052a2d56ac52d6c6d6bdbcdf9588315727c3254).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #85754 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85754/testReport)** for PR 20171 at commit [`b801e70`](https://github.com/apache/spark/commit/b801e7094c7c730adb53a25e13363973930a0b42).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #86121 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86121/testReport)** for PR 20171 at commit [`99fc0b2`](https://github.com/apache/spark/commit/99fc0b2eb36efedc91c40c8eb377bd2a6d0e23a2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #85759 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85759/testReport)** for PR 20171 at commit [`b801e70`](https://github.com/apache/spark/commit/b801e7094c7c730adb53a25e13363973930a0b42).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #86165 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86165/testReport)** for PR 20171 at commit [`47bce1e`](https://github.com/apache/spark/commit/47bce1efd8b86334f90e38a137d46ccef0085245).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161709870
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -226,18 +226,23 @@ def dropGlobalTempView(self, viewName):
     
         @ignore_unicode_prefix
         @since(2.0)
    -    def registerFunction(self, name, f, returnType=StringType()):
    +    def registerFunction(self, name, f, returnType=None):
             """Registers a Python function (including lambda function) or a :class:`UserDefinedFunction`
    -        as a UDF. The registered UDF can be used in SQL statement.
    +        as a UDF. The registered UDF can be used in SQL statements.
     
    -        In addition to a name and the function itself, the return type can be optionally specified.
    -        When the return type is not given it default to a string and conversion will automatically
    -        be done.  For any other return type, the produced object must match the specified type.
    +        :func:`spark.udf.register` is an alias for :func:`spark.catalog.registerFunction`.
    --- End diff --
    
    :func:`spark.catalog.registerFunction` is an alias for :func:`spark.udf.register`. ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161605021
  
    --- Diff: python/pyspark/sql/catalog.py ---
    @@ -256,27 +258,58 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> spark.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = spark.udf.register("slen", slen)
    +        >>> spark.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = spark.udf.register("random_udf", random_udf)
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'82')]
    +        [Row(random_udf()=82)]
             >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: +SKIP
    -        [Row(random_udf()=u'62')]
    +        [Row(<lambda>()=26)]
    +
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
    +        ... def add_one(x):
    +        ...     return x + 1
    +        ...
    +        >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
    +        >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # doctest: +SKIP
    +        [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
             """
     
             # This is to check whether the input function is a wrapped/native UserDefinedFunction
             if hasattr(f, 'asNondeterministic'):
    -            udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
    -                                      evalType=PythonEvalType.SQL_BATCHED_UDF,
    -                                      deterministic=f.deterministic)
    +            if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
    +                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
    +                raise ValueError(
    +                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
    +            if returnType is not None and not isinstance(returnType, DataType):
    +                returnType = _parse_datatype_string(returnType)
    +            if returnType is not None and returnType != f.returnType:
    --- End diff --
    
    Why did you cc other guys @gatorsmile?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs for SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20171
  
    **[Test build #85750 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85750/testReport)** for PR 20171 at commit [`3411dcc`](https://github.com/apache/spark/commit/3411dccc6ab57f3135a99d2b2b535fd5c135cbc7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r160318424
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3616,6 +3616,34 @@ def test_vectorized_udf_basic(self):
                             bool_f(col('bool')))
             self.assertEquals(df.collect(), res.collect())
     
    +    def test_register_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        twoArgsPandasUDF = pandas_udf(lambda x: len(x), IntegerType())
    +        self.assertEqual(twoArgsPandasUDF.deterministic, True)
    +        self.assertEqual(twoArgsPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        newPandasUDF = self.spark.catalog.registerFunction(
    +            "twoArgsPandasUDF", twoArgsPandasUDF, IntegerType())
    +        self.assertEqual(newPandasUDF.deterministic, True)
    +        self.assertEqual(newPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +        [row] = self.spark.sql("SELECT twoArgsPandasUDF('test')").collect()
    +        self.assertEqual(row[0], 4)
    +
    +    def test_register_nondeterministic_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf
    +        from pyspark.rdd import PythonEvalType
    +        import random
    +        randomPandasUDF = pandas_udf(
    +            lambda x: random.randint(6, 6) + x, StringType()).asNondeterministic()
    --- End diff --
    
    How about the following strategy?
    
    1. make the default value for `returnType` `None`.
    2. if `returnType is None` for a Python function, use `StringType` as the same as the current default value.
    3. if `returnType is None` for UDF, use the UDF's `returnType`, otherwise respect the user specified `returnType`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20171#discussion_r161385482
  
    --- Diff: python/pyspark/sql/context.py ---
    @@ -204,15 +206,31 @@ def registerFunction(self, name, f, returnType=StringType()):
             >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
             [Row(stringLengthInt(test)=4)]
     
    +        >>> from pyspark.sql.types import IntegerType
    +        >>> from pyspark.sql.functions import udf
    +        >>> slen = udf(lambda s: len(s), IntegerType())
    +        >>> _ = sqlContext.udf.register("slen", slen)
    +        >>> sqlContext.sql("SELECT slen('test')").collect()
    +        [Row(slen(test)=4)]
    +
             >>> import random
             >>> from pyspark.sql.functions import udf
    -        >>> from pyspark.sql.types import IntegerType, StringType
    +        >>> from pyspark.sql.types import IntegerType
             >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
    -        >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType())
    +        >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf)
    --- End diff --
    
    Would it be better to keep `sqlContext.registerFunction` as was? The documentation will show the examples for `SQLContext.registerFunction` API ..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile closed the pull request at:

    https://github.com/apache/spark/pull/20171


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org