You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by HyukjinKwon <gi...@git.apache.org> on 2018/02/26 15:10:31 UTC

[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

GitHub user HyukjinKwon opened a pull request:

    https://github.com/apache/spark/pull/20678

    [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in toPandas/createDataFrame with Pandas DataFrame

    ## What changes were proposed in this pull request?
    
    This PR adds a configuration to control the fallback of Arrow optimization for `toPandas` and `createDataFrame` with Pandas DataFrame.
    
    ## How was this patch tested?
    
    Manually tested and unit tests added.
    
    You can test this by:
    
    **`createDataFrame`**
    
    ```python
    spark.conf.set("spark.sql.execution.arrow.enabled", False)
    pdf = spark.createDataFrame([[{'a': 1}]]).toPandas()
    spark.conf.set("spark.sql.execution.arrow.enabled", True)
    spark.conf.set("spark.sql.execution.arrow.fallback.enabled", True)
    spark.createDataFrame(pdf, "a: map<string, int>")
    ```
    
    ```python
    spark.conf.set("spark.sql.execution.arrow.enabled", False)
    pdf = spark.createDataFrame([[{'a': 1}]]).toPandas()
    spark.conf.set("spark.sql.execution.arrow.enabled", True)
    spark.conf.set("spark.sql.execution.arrow.fallback.enabled", False)
    spark.createDataFrame(pdf, "a: map<string, int>")
    ```
    
    
    **`toPandas`**
    
    ```python
    spark.conf.set("spark.sql.execution.arrow.enabled", True)
    spark.conf.set("spark.sql.execution.arrow.fallback.enabled", True)
    spark.createDataFrame([[{'a': 1}]]).toPandas()
    ```
    
    ```python
    spark.conf.set("spark.sql.execution.arrow.enabled", True)
    spark.conf.set("spark.sql.execution.arrow.fallback.enabled", False)
    spark.createDataFrame([[{'a': 1}]]).toPandas()
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HyukjinKwon/spark SPARK-23380-conf

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20678.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20678
    
----
commit ff9d38b691bd54c073db3b55983564cfbb0d903e
Author: hyukjinkwon <gu...@...>
Date:   2018-02-26T15:02:55Z

    Adds a conf for Arrow fallback in toPandas/createDataFrame with Pandas DataFrame

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #87673 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87673/testReport)** for PR 20678 at commit [`ff9d38b`](https://github.com/apache/spark/commit/ff9d38b691bd54c073db3b55983564cfbb0d903e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87962/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170796605
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -1986,55 +1986,89 @@ def toPandas(self):
                 timezone = None
     
             if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
    +            should_fallback = False
                 try:
    -                from pyspark.sql.types import _check_dataframe_convert_date, \
    -                    _check_dataframe_localize_timestamps, to_arrow_schema
    +                from pyspark.sql.types import to_arrow_schema
                     from pyspark.sql.utils import require_minimum_pyarrow_version
    +
                     require_minimum_pyarrow_version()
    -                import pyarrow
                     to_arrow_schema(self.schema)
    -                tables = self._collectAsArrow()
    -                if tables:
    -                    table = pyarrow.concat_tables(tables)
    -                    pdf = table.to_pandas()
    -                    pdf = _check_dataframe_convert_date(pdf, self.schema)
    -                    return _check_dataframe_localize_timestamps(pdf, timezone)
    -                else:
    -                    return pd.DataFrame.from_records([], columns=self.columns)
                 except Exception as e:
    -                msg = (
    -                    "Note: toPandas attempted Arrow optimization because "
    -                    "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
    -                    "to disable this.")
    -                raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
    -        else:
    -            pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
     
    -            dtype = {}
    +                if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "false") \
    --- End diff --
    
    Argh, this was my mistake during testing by multiple combinations. Will fix it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87719/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170763146
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3493,19 +3495,42 @@ def create_pandas_data_frame(self):
             data_dict["4_float_t"] = np.float32(data_dict["4_float_t"])
             return pd.DataFrame(data=data_dict)
     
    -    def test_unsupported_datatype(self):
    -        schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    -        df = self.spark.createDataFrame([(None,)], schema=schema)
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(Exception, 'Unsupported type'):
    -                df.toPandas()
    +    @contextmanager
    +    def arrow_fallback(self, enabled):
    --- End diff --
    
    I think it would be best to disable fallback for all the tests on setup/teardown.  That way if something goes wrong elsewhere, the tests won't start passing due to falling back. For the test where it is enabled, you could do that explicitly. What do you think?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170909693
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -1986,55 +1986,87 @@ def toPandas(self):
                 timezone = None
     
             if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
    +            should_fallback = False
                 try:
    -                from pyspark.sql.types import _check_dataframe_convert_date, \
    -                    _check_dataframe_localize_timestamps, to_arrow_schema
    +                from pyspark.sql.types import to_arrow_schema
                     from pyspark.sql.utils import require_minimum_pyarrow_version
    +
                     require_minimum_pyarrow_version()
    -                import pyarrow
                     to_arrow_schema(self.schema)
    -                tables = self._collectAsArrow()
    -                if tables:
    -                    table = pyarrow.concat_tables(tables)
    -                    pdf = table.to_pandas()
    -                    pdf = _check_dataframe_convert_date(pdf, self.schema)
    -                    return _check_dataframe_localize_timestamps(pdf, timezone)
    -                else:
    -                    return pd.DataFrame.from_records([], columns=self.columns)
                 except Exception as e:
    -                msg = (
    -                    "Note: toPandas attempted Arrow optimization because "
    -                    "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
    -                    "to disable this.")
    -                raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
    -        else:
    -            pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
     
    -            dtype = {}
    +                if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "true") \
    +                        .lower() == "true":
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n  %s\n"
    +                        "Attempts non-optimization as "
    +                        "'spark.sql.execution.arrow.fallback.enabled' is set to "
    +                        "true." % _exception_message(e))
    +                    warnings.warn(msg)
    +                    should_fallback = True
    +                else:
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n  %s\n"
    --- End diff --
    
    Hm ... I tried to like make a `"toPandas attempted Arrow optimization because ... %s"` and reuse it but seems a little bit overkill.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1061/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/20678


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170809925
  
    --- Diff: docs/sql-programming-guide.md ---
    @@ -1800,6 +1800,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
     ## Upgrading From Spark SQL 2.3 to 2.4
     
       - Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively.
    +  - In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unabled to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched by `spark.sql.execution.arrow.fallback.enabled`.
    --- End diff --
    
    Not only in migration section, I think we should also document this config in the section like `PySpark Usage Guide for Pandas with Apache Arrow`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r171089207
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3493,19 +3514,30 @@ def create_pandas_data_frame(self):
             data_dict["4_float_t"] = np.float32(data_dict["4_float_t"])
             return pd.DataFrame(data=data_dict)
     
    -    def test_unsupported_datatype(self):
    -        schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    -        df = self.spark.createDataFrame([(None,)], schema=schema)
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(Exception, 'Unsupported type'):
    -                df.toPandas()
    +    def test_toPandas_fallback_enabled(self):
    +        import pandas as pd
     
    -        df = self.spark.createDataFrame([(None,)], schema="a binary")
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(
    -                    Exception,
    -                    'Unsupported type.*\nNote: toPandas attempted Arrow optimization because'):
    -                df.toPandas()
    +        with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", True):
    +            schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    +            df = self.spark.createDataFrame([({u'a': 1},)], schema=schema)
    +            with QuietTest(self.sc):
    +                with warnings.catch_warnings(record=True) as warns:
    +                    pdf = df.toPandas()
    +                    # Catch and check the last UserWarning.
    +                    user_warns = [
    +                        warn.message for warn in warns if isinstance(warn.message, UserWarning)]
    +                    self.assertTrue(len(user_warns) > 0)
    +                    self.assertTrue(
    +                        "Attempts non-optimization" in _exception_message(user_warns[-1]))
    +                    self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]}))
    +
    +    def test_toPandas_fallback_disabled(self):
    +        with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", False):
    --- End diff --
    
    Will fix it for using a `dict` here soon.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1320/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged to master.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #87695 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87695/testReport)** for PR 20678 at commit [`7641fd0`](https://github.com/apache/spark/commit/7641fd090eabb160282a045047c5469f64ad2158).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170792103
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -1986,55 +1986,89 @@ def toPandas(self):
                 timezone = None
     
             if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
    +            should_fallback = False
                 try:
    -                from pyspark.sql.types import _check_dataframe_convert_date, \
    -                    _check_dataframe_localize_timestamps, to_arrow_schema
    +                from pyspark.sql.types import to_arrow_schema
                     from pyspark.sql.utils import require_minimum_pyarrow_version
    +
                     require_minimum_pyarrow_version()
    -                import pyarrow
                     to_arrow_schema(self.schema)
    -                tables = self._collectAsArrow()
    -                if tables:
    -                    table = pyarrow.concat_tables(tables)
    -                    pdf = table.to_pandas()
    -                    pdf = _check_dataframe_convert_date(pdf, self.schema)
    -                    return _check_dataframe_localize_timestamps(pdf, timezone)
    -                else:
    -                    return pd.DataFrame.from_records([], columns=self.columns)
                 except Exception as e:
    -                msg = (
    -                    "Note: toPandas attempted Arrow optimization because "
    -                    "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
    -                    "to disable this.")
    -                raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
    -        else:
    -            pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
     
    -            dtype = {}
    +                if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "false") \
    --- End diff --
    
    We should use the same default value `"true"` as the default value defined in `SQLConf`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87673/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87695/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1083/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #87962 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87962/testReport)** for PR 20678 at commit [`af60cb7`](https://github.com/apache/spark/commit/af60cb75b52479fab636a20fd4face25aa9791e3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87674/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #88016 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88016/testReport)** for PR 20678 at commit [`b5bea82`](https://github.com/apache/spark/commit/b5bea82a9b84a3478b634052aa2662a44311a512).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Will try to clean up soon.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    gentle ping, I believe this is ready for another look.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1084/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170910402
  
    --- Diff: docs/sql-programming-guide.md ---
    @@ -1800,6 +1800,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
     ## Upgrading From Spark SQL 2.3 to 2.4
     
       - Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively.
    +  - In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unabled to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched by `spark.sql.execution.arrow.fallback.enabled`.
    --- End diff --
    
    Yup, added.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170760936
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -1068,6 +1068,14 @@ object SQLConf {
           .booleanConf
           .createWithDefault(false)
     
    +  val ARROW_FALLBACK_ENABLE =
    +    buildConf("spark.sql.execution.arrow.fallback.enabled")
    +      .doc("When true, the optimization by 'spark.sql.execution.arrow.enabled' " +
    +        "could be disabled when it is unable to be used, and fallback to " +
    +        "non-optimization.")
    --- End diff --
    
    Just a suggestion: "When true, optimizations enabled by 'spark.sql.execution.arrow.enabled' will fallback automatically to non-optimized implementations if an error occurs."


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r172540555
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3493,19 +3519,30 @@ def create_pandas_data_frame(self):
             data_dict["4_float_t"] = np.float32(data_dict["4_float_t"])
             return pd.DataFrame(data=data_dict)
     
    -    def test_unsupported_datatype(self):
    -        schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    -        df = self.spark.createDataFrame([(None,)], schema=schema)
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(Exception, 'Unsupported type'):
    -                df.toPandas()
    +    def test_toPandas_fallback_enabled(self):
    +        import pandas as pd
     
    -        df = self.spark.createDataFrame([(None,)], schema="a binary")
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(
    -                    Exception,
    -                    'Unsupported type.*\nNote: toPandas attempted Arrow optimization because'):
    -                df.toPandas()
    +        with self.sql_conf({"spark.sql.execution.arrow.fallback.enabled": True}):
    +            schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    +            df = self.spark.createDataFrame([({u'a': 1},)], schema=schema)
    +            with QuietTest(self.sc):
    +                with warnings.catch_warnings(record=True) as warns:
    +                    pdf = df.toPandas()
    +                    # Catch and check the last UserWarning.
    +                    user_warns = [
    +                        warn.message for warn in warns if isinstance(warn.message, UserWarning)]
    +                    self.assertTrue(len(user_warns) > 0)
    +                    self.assertTrue(
    +                        "Attempts non-optimization" in _exception_message(user_warns[-1]))
    +                    self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]}))
    +
    +    def test_toPandas_fallback_disabled(self):
    +        with self.sql_conf({"spark.sql.execution.arrow.fallback.enabled": False}):
    --- End diff --
    
    Hm .. yup. I don't feel strongly. Will remove it out.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #87785 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87785/testReport)** for PR 20678 at commit [`ed30c20`](https://github.com/apache/spark/commit/ed30c205d95a6555475a06376f0d88e53e2f3da3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    cc @ueshin 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1060/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1326/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r172087998
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -1986,55 +1986,91 @@ def toPandas(self):
                 timezone = None
     
             if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
    +            use_arrow = True
                 try:
    -                from pyspark.sql.types import _check_dataframe_convert_date, \
    -                    _check_dataframe_localize_timestamps, to_arrow_schema
    +                from pyspark.sql.types import to_arrow_schema
                     from pyspark.sql.utils import require_minimum_pyarrow_version
    +
                     require_minimum_pyarrow_version()
    -                import pyarrow
                     to_arrow_schema(self.schema)
    -                tables = self._collectAsArrow()
    -                if tables:
    -                    table = pyarrow.concat_tables(tables)
    -                    pdf = table.to_pandas()
    -                    pdf = _check_dataframe_convert_date(pdf, self.schema)
    -                    return _check_dataframe_localize_timestamps(pdf, timezone)
    -                else:
    -                    return pd.DataFrame.from_records([], columns=self.columns)
                 except Exception as e:
    -                msg = (
    -                    "Note: toPandas attempted Arrow optimization because "
    -                    "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
    -                    "to disable this.")
    -                raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
    -        else:
    -            pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
     
    -            dtype = {}
    +                if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "true") \
    +                        .lower() == "true":
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n  %s\n"
    +                        "Attempts non-optimization as "
    +                        "'spark.sql.execution.arrow.fallback.enabled' is set to "
    +                        "true." % _exception_message(e))
    +                    warnings.warn(msg)
    +                    use_arrow = False
    +                else:
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n  %s\n"
    +                        "For fallback to non-optimization automatically, please set true to "
    +                        "'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e))
    +                    raise RuntimeError(msg)
    +
    +            # Try to use Arrow optimization when the schema is supported and the required version
    +            # of PyArrow is found, if 'spark.sql.execution.arrow.fallback.enabled' is enabled.
    --- End diff --
    
    `spark.sql.execution.arrow.enabled` instead of `spark.sql.execution.arrow.fallback.enabled`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87785/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170810197
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -1986,55 +1986,87 @@ def toPandas(self):
                 timezone = None
     
             if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
    +            should_fallback = False
                 try:
    -                from pyspark.sql.types import _check_dataframe_convert_date, \
    -                    _check_dataframe_localize_timestamps, to_arrow_schema
    +                from pyspark.sql.types import to_arrow_schema
                     from pyspark.sql.utils import require_minimum_pyarrow_version
    +
                     require_minimum_pyarrow_version()
    -                import pyarrow
                     to_arrow_schema(self.schema)
    -                tables = self._collectAsArrow()
    -                if tables:
    -                    table = pyarrow.concat_tables(tables)
    -                    pdf = table.to_pandas()
    -                    pdf = _check_dataframe_convert_date(pdf, self.schema)
    -                    return _check_dataframe_localize_timestamps(pdf, timezone)
    -                else:
    -                    return pd.DataFrame.from_records([], columns=self.columns)
                 except Exception as e:
    -                msg = (
    -                    "Note: toPandas attempted Arrow optimization because "
    -                    "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
    -                    "to disable this.")
    -                raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
    -        else:
    -            pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
     
    -            dtype = {}
    +                if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "true") \
    +                        .lower() == "true":
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n  %s\n"
    +                        "Attempts non-optimization as "
    +                        "'spark.sql.execution.arrow.fallback.enabled' is set to "
    +                        "true." % _exception_message(e))
    +                    warnings.warn(msg)
    +                    should_fallback = True
    +                else:
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n  %s\n"
    --- End diff --
    
    `toPandas attempted Arrow optimization because...` repeats three times here, maybe we can dedup it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r171111186
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -1986,55 +1986,89 @@ def toPandas(self):
                 timezone = None
     
             if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
    +            should_fallback = False
                 try:
    -                from pyspark.sql.types import _check_dataframe_convert_date, \
    -                    _check_dataframe_localize_timestamps, to_arrow_schema
    +                from pyspark.sql.types import to_arrow_schema
                     from pyspark.sql.utils import require_minimum_pyarrow_version
    +
                     require_minimum_pyarrow_version()
    -                import pyarrow
                     to_arrow_schema(self.schema)
    -                tables = self._collectAsArrow()
    -                if tables:
    -                    table = pyarrow.concat_tables(tables)
    -                    pdf = table.to_pandas()
    -                    pdf = _check_dataframe_convert_date(pdf, self.schema)
    -                    return _check_dataframe_localize_timestamps(pdf, timezone)
    -                else:
    -                    return pd.DataFrame.from_records([], columns=self.columns)
                 except Exception as e:
    -                msg = (
    -                    "Note: toPandas attempted Arrow optimization because "
    -                    "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
    -                    "to disable this.")
    -                raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
    -        else:
    -            pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
     
    -            dtype = {}
    +                if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "true") \
    +                        .lower() == "true":
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n  %s\n"
    +                        "Attempts non-optimization as "
    +                        "'spark.sql.execution.arrow.fallback.enabled' is set to "
    +                        "true." % _exception_message(e))
    +                    warnings.warn(msg)
    +                    should_fallback = True
    +                else:
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n  %s\n"
    +                        "For fallback to non-optimization automatically, please set true to "
    +                        "'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e))
    +                    raise RuntimeError(msg)
    +
    +            if not should_fallback:
    +                try:
    +                    from pyspark.sql.types import _check_dataframe_convert_date, \
    +                        _check_dataframe_localize_timestamps
    +                    import pyarrow
    +
    +                    tables = self._collectAsArrow()
    +                    if tables:
    +                        table = pyarrow.concat_tables(tables)
    +                        pdf = table.to_pandas()
    +                        pdf = _check_dataframe_convert_date(pdf, self.schema)
    +                        return _check_dataframe_localize_timestamps(pdf, timezone)
    +                    else:
    +                        return pd.DataFrame.from_records([], columns=self.columns)
    +                except Exception as e:
    +                    # We might have to allow fallback here as well but multiple Spark jobs can
    +                    # be executed. So, simply fail in this case for now.
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed unexpectedly:\n  %s\n"
    +                        "Note that 'spark.sql.execution.arrow.fallback.enabled' does "
    --- End diff --
    
    +1 good job having this explanation in the exception


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #87719 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87719/testReport)** for PR 20678 at commit [`229a5f7`](https://github.com/apache/spark/commit/229a5f786028a8b50af7429da9f02ec70b7d4e49).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #88007 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88007/testReport)** for PR 20678 at commit [`b5bea82`](https://github.com/apache/spark/commit/b5bea82a9b84a3478b634052aa2662a44311a512).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r171138898
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -1986,55 +1986,89 @@ def toPandas(self):
                 timezone = None
     
             if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
    +            should_fallback = False
                 try:
    -                from pyspark.sql.types import _check_dataframe_convert_date, \
    -                    _check_dataframe_localize_timestamps, to_arrow_schema
    +                from pyspark.sql.types import to_arrow_schema
                     from pyspark.sql.utils import require_minimum_pyarrow_version
    +
                     require_minimum_pyarrow_version()
    -                import pyarrow
                     to_arrow_schema(self.schema)
    -                tables = self._collectAsArrow()
    -                if tables:
    -                    table = pyarrow.concat_tables(tables)
    -                    pdf = table.to_pandas()
    -                    pdf = _check_dataframe_convert_date(pdf, self.schema)
    -                    return _check_dataframe_localize_timestamps(pdf, timezone)
    -                else:
    -                    return pd.DataFrame.from_records([], columns=self.columns)
                 except Exception as e:
    -                msg = (
    -                    "Note: toPandas attempted Arrow optimization because "
    -                    "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
    -                    "to disable this.")
    -                raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
    -        else:
    -            pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
     
    -            dtype = {}
    +                if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "true") \
    +                        .lower() == "true":
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n  %s\n"
    +                        "Attempts non-optimization as "
    +                        "'spark.sql.execution.arrow.fallback.enabled' is set to "
    +                        "true." % _exception_message(e))
    +                    warnings.warn(msg)
    +                    should_fallback = True
    +                else:
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n  %s\n"
    +                        "For fallback to non-optimization automatically, please set true to "
    +                        "'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e))
    +                    raise RuntimeError(msg)
    +
    +            if not should_fallback:
    --- End diff --
    
    Correct, but there's one more - we fallback if PyArrow is not installed. Will add some comments to make this easier to read.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #88065 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88065/testReport)** for PR 20678 at commit [`4ccaa81`](https://github.com/apache/spark/commit/4ccaa81038e9e87b9772394f5b39866b65c798d8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170809560
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -1986,55 +1986,87 @@ def toPandas(self):
                 timezone = None
     
             if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
    +            should_fallback = False
                 try:
    -                from pyspark.sql.types import _check_dataframe_convert_date, \
    -                    _check_dataframe_localize_timestamps, to_arrow_schema
    +                from pyspark.sql.types import to_arrow_schema
                     from pyspark.sql.utils import require_minimum_pyarrow_version
    +
                     require_minimum_pyarrow_version()
    -                import pyarrow
                     to_arrow_schema(self.schema)
    -                tables = self._collectAsArrow()
    -                if tables:
    -                    table = pyarrow.concat_tables(tables)
    -                    pdf = table.to_pandas()
    -                    pdf = _check_dataframe_convert_date(pdf, self.schema)
    -                    return _check_dataframe_localize_timestamps(pdf, timezone)
    -                else:
    -                    return pd.DataFrame.from_records([], columns=self.columns)
                 except Exception as e:
    -                msg = (
    -                    "Note: toPandas attempted Arrow optimization because "
    -                    "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
    -                    "to disable this.")
    -                raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
    -        else:
    -            pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
     
    -            dtype = {}
    +                if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "true") \
    +                        .lower() == "true":
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n  %s\n"
    +                        "Attempts non-optimization as "
    +                        "'spark.sql.execution.arrow.fallback.enabled' is set to "
    +                        "true." % _exception_message(e))
    +                    warnings.warn(msg)
    +                    should_fallback = True
    +                else:
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n  %s\n"
    +                        "For fallback to non-optimization automatically, please set true to "
    +                        "'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e))
    +                    raise RuntimeError(msg)
    +
    +            if not should_fallback:
    +                try:
    +                    from pyspark.sql.types import _check_dataframe_convert_date, \
    +                        _check_dataframe_localize_timestamps
    +                    import pyarrow
    +
    +                    tables = self._collectAsArrow()
    +                    if tables:
    +                        table = pyarrow.concat_tables(tables)
    +                        pdf = table.to_pandas()
    +                        pdf = _check_dataframe_convert_date(pdf, self.schema)
    +                        return _check_dataframe_localize_timestamps(pdf, timezone)
    +                    else:
    +                        return pd.DataFrame.from_records([], columns=self.columns)
    +                except Exception as e:
    +                    # We might have to allow fallback here as well but multiple Spark jobs can
    +                    # be executed. So, simply fail in this case for now.
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed unexpectedly:\n"
    +                        "  %s" % _exception_message(e))
    --- End diff --
    
    No need to mention fallback mode in the message like above?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170623237
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -1986,55 +1986,89 @@ def toPandas(self):
                 timezone = None
     
             if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
    +            _should_fallback = False
                 try:
    -                from pyspark.sql.types import _check_dataframe_convert_date, \
    -                    _check_dataframe_localize_timestamps, to_arrow_schema
    +                from pyspark.sql.types import to_arrow_schema
                     from pyspark.sql.utils import require_minimum_pyarrow_version
    +
                     require_minimum_pyarrow_version()
    -                import pyarrow
                     to_arrow_schema(self.schema)
    -                tables = self._collectAsArrow()
    -                if tables:
    -                    table = pyarrow.concat_tables(tables)
    -                    pdf = table.to_pandas()
    -                    pdf = _check_dataframe_convert_date(pdf, self.schema)
    -                    return _check_dataframe_localize_timestamps(pdf, timezone)
    -                else:
    -                    return pd.DataFrame.from_records([], columns=self.columns)
                 except Exception as e:
    -                msg = (
    -                    "Note: toPandas attempted Arrow optimization because "
    -                    "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
    -                    "to disable this.")
    -                raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
    -        else:
    -            pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
     
    -            dtype = {}
    +                if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "false") \
    +                        .lower() == "true":
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n"
    +                        "  %s\n"
    +                        "Attempts non-optimization as "
    +                        "'spark.sql.execution.arrow.fallback.enabled' is set to "
    +                        "true." % _exception_message(e))
    +                    warnings.warn(msg)
    +                    _should_fallback = True
    +                else:
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n"
    +                        "  %s\n"
    +                        "For fallback to non-optimization automatically, please set true to "
    +                        "'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e))
    +                    raise RuntimeError(msg)
    +
    +            if not _should_fallback:
    +                try:
    +                    from pyspark.sql.types import _check_dataframe_convert_date, \
    +                        _check_dataframe_localize_timestamps
    +                    import pyarrow
    +
    +                    tables = self._collectAsArrow()
    +                    if tables:
    +                        table = pyarrow.concat_tables(tables)
    +                        pdf = table.to_pandas()
    +                        pdf = _check_dataframe_convert_date(pdf, self.schema)
    +                        return _check_dataframe_localize_timestamps(pdf, timezone)
    +                    else:
    +                        return pd.DataFrame.from_records([], columns=self.columns)
    +                except Exception as e:
    +                    # We might have to allow fallback here as well but multiple Spark jobs can
    +                    # be executed. So, simply fail in this case for now.
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed unexpectedly:\n"
    +                        "  %s" % _exception_message(e))
    +                    raise RuntimeError(msg)
    +
    +        # Below is toPandas without Arrow optimization.
    --- End diff --
    
    Likewise, the change from here is due to removed `else:` block.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r171111018
  
    --- Diff: docs/sql-programming-guide.md ---
    @@ -1689,6 +1689,10 @@ using the call `toPandas()` and when creating a Spark DataFrame from a Pandas Da
     `createDataFrame(pandas_df)`. To use Arrow when executing these calls, users need to first set
     the Spark configuration 'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by default.
     
    +In addition, optimizations enabled by 'spark.sql.execution.arrow.enabled' will fallback automatically
    +to non-optimized implementations if an error occurs. This can be controlled by
    --- End diff --
    
    So we need to be clear that we only do this if an error occurs in schema parsing, not any error.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170902707
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3493,19 +3514,30 @@ def create_pandas_data_frame(self):
             data_dict["4_float_t"] = np.float32(data_dict["4_float_t"])
             return pd.DataFrame(data=data_dict)
     
    -    def test_unsupported_datatype(self):
    -        schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    -        df = self.spark.createDataFrame([(None,)], schema=schema)
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(Exception, 'Unsupported type'):
    -                df.toPandas()
    +    def test_toPandas_fallback_enabled(self):
    +        import pandas as pd
     
    -        df = self.spark.createDataFrame([(None,)], schema="a binary")
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(
    -                    Exception,
    -                    'Unsupported type.*\nNote: toPandas attempted Arrow optimization because'):
    -                df.toPandas()
    +        with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", True):
    +            schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    +            df = self.spark.createDataFrame([({u'a': 1},)], schema=schema)
    +            with QuietTest(self.sc):
    +                with warnings.catch_warnings(record=True) as warns:
    +                    pdf = df.toPandas()
    +                    # Catch and check the last UserWarning.
    +                    user_warns = [
    +                        warn.message for warn in warns if isinstance(warn.message, UserWarning)]
    +                    self.assertTrue(len(user_warns) > 0)
    +                    self.assertTrue(
    +                        "Attempts non-optimization" in _exception_message(user_warns[-1]))
    +                    self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]}))
    +
    +    def test_toPandas_fallback_disabled(self):
    +        with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", False):
    --- End diff --
    
    Yea, I was thinking that too. I took a quick look for the rest of tests and seems we are fine with a single pair for now. Will fix it as so in place in the future if you are okay with that too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #87695 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87695/testReport)** for PR 20678 at commit [`7641fd0`](https://github.com/apache/spark/commit/7641fd090eabb160282a045047c5469f64ad2158).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170763497
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -1986,55 +1986,89 @@ def toPandas(self):
                 timezone = None
     
             if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
    +            should_fallback = False
                 try:
    -                from pyspark.sql.types import _check_dataframe_convert_date, \
    -                    _check_dataframe_localize_timestamps, to_arrow_schema
    +                from pyspark.sql.types import to_arrow_schema
                     from pyspark.sql.utils import require_minimum_pyarrow_version
    +
                     require_minimum_pyarrow_version()
    -                import pyarrow
                     to_arrow_schema(self.schema)
    -                tables = self._collectAsArrow()
    -                if tables:
    -                    table = pyarrow.concat_tables(tables)
    -                    pdf = table.to_pandas()
    -                    pdf = _check_dataframe_convert_date(pdf, self.schema)
    -                    return _check_dataframe_localize_timestamps(pdf, timezone)
    -                else:
    -                    return pd.DataFrame.from_records([], columns=self.columns)
                 except Exception as e:
    -                msg = (
    -                    "Note: toPandas attempted Arrow optimization because "
    -                    "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
    -                    "to disable this.")
    -                raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
    -        else:
    -            pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
     
    -            dtype = {}
    +                if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "false") \
    +                        .lower() == "true":
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n"
    +                        "  %s\n"
    --- End diff --
    
    I think it would be fine to move this line to the previous to make it a little more compact, but up to you.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r172751164
  
    --- Diff: docs/sql-programming-guide.md ---
    @@ -1800,6 +1800,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
     ## Upgrading From Spark SQL 2.3 to 2.4
     
       - Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively.
    +  - In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unabled to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched by `spark.sql.execution.arrow.fallback.enabled`.
    --- End diff --
    
    `which can be switched by` -> `which can be switched on by` or `which can be switched on with`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1371/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #87962 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87962/testReport)** for PR 20678 at commit [`af60cb7`](https://github.com/apache/spark/commit/af60cb75b52479fab636a20fd4face25aa9791e3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #87785 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87785/testReport)** for PR 20678 at commit [`ed30c20`](https://github.com/apache/spark/commit/ed30c205d95a6555475a06376f0d88e53e2f3da3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #87719 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87719/testReport)** for PR 20678 at commit [`229a5f7`](https://github.com/apache/spark/commit/229a5f786028a8b50af7429da9f02ec70b7d4e49).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170993760
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3493,19 +3514,30 @@ def create_pandas_data_frame(self):
             data_dict["4_float_t"] = np.float32(data_dict["4_float_t"])
             return pd.DataFrame(data=data_dict)
     
    -    def test_unsupported_datatype(self):
    -        schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    -        df = self.spark.createDataFrame([(None,)], schema=schema)
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(Exception, 'Unsupported type'):
    -                df.toPandas()
    +    def test_toPandas_fallback_enabled(self):
    +        import pandas as pd
     
    -        df = self.spark.createDataFrame([(None,)], schema="a binary")
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(
    -                    Exception,
    -                    'Unsupported type.*\nNote: toPandas attempted Arrow optimization because'):
    -                df.toPandas()
    +        with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", True):
    +            schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    +            df = self.spark.createDataFrame([({u'a': 1},)], schema=schema)
    +            with QuietTest(self.sc):
    +                with warnings.catch_warnings(record=True) as warns:
    +                    pdf = df.toPandas()
    +                    # Catch and check the last UserWarning.
    +                    user_warns = [
    +                        warn.message for warn in warns if isinstance(warn.message, UserWarning)]
    +                    self.assertTrue(len(user_warns) > 0)
    +                    self.assertTrue(
    +                        "Attempts non-optimization" in _exception_message(user_warns[-1]))
    +                    self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]}))
    +
    +    def test_toPandas_fallback_disabled(self):
    +        with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", False):
    --- End diff --
    
    Yeah, good idea!  +1 on using a `dict`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1154/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r171110674
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -1986,55 +1986,89 @@ def toPandas(self):
                 timezone = None
     
             if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
    +            should_fallback = False
    --- End diff --
    
    This variable name is a little confusing to me while I'm tracing the code. How about "use_arrow" and swap the meanings? Because right now if a user doesn't have arrow enabled we skip the arrow conversion because of the value of should_fallback which seems.... odd.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r172267148
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3493,19 +3519,30 @@ def create_pandas_data_frame(self):
             data_dict["4_float_t"] = np.float32(data_dict["4_float_t"])
             return pd.DataFrame(data=data_dict)
     
    -    def test_unsupported_datatype(self):
    -        schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    -        df = self.spark.createDataFrame([(None,)], schema=schema)
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(Exception, 'Unsupported type'):
    -                df.toPandas()
    +    def test_toPandas_fallback_enabled(self):
    +        import pandas as pd
     
    -        df = self.spark.createDataFrame([(None,)], schema="a binary")
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(
    -                    Exception,
    -                    'Unsupported type.*\nNote: toPandas attempted Arrow optimization because'):
    -                df.toPandas()
    +        with self.sql_conf({"spark.sql.execution.arrow.fallback.enabled": True}):
    +            schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    +            df = self.spark.createDataFrame([({u'a': 1},)], schema=schema)
    +            with QuietTest(self.sc):
    +                with warnings.catch_warnings(record=True) as warns:
    +                    pdf = df.toPandas()
    +                    # Catch and check the last UserWarning.
    +                    user_warns = [
    +                        warn.message for warn in warns if isinstance(warn.message, UserWarning)]
    +                    self.assertTrue(len(user_warns) > 0)
    +                    self.assertTrue(
    +                        "Attempts non-optimization" in _exception_message(user_warns[-1]))
    +                    self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]}))
    +
    +    def test_toPandas_fallback_disabled(self):
    +        with self.sql_conf({"spark.sql.execution.arrow.fallback.enabled": False}):
    --- End diff --
    
    Do you still want this since it is disabled in setUpClass? It doesn't hurt to have it, but just thought I'd ask


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88016/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170799189
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3493,19 +3495,42 @@ def create_pandas_data_frame(self):
             data_dict["4_float_t"] = np.float32(data_dict["4_float_t"])
             return pd.DataFrame(data=data_dict)
     
    -    def test_unsupported_datatype(self):
    -        schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    -        df = self.spark.createDataFrame([(None,)], schema=schema)
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(Exception, 'Unsupported type'):
    -                df.toPandas()
    +    @contextmanager
    +    def arrow_fallback(self, enabled):
    --- End diff --
    
    Yup, makes sense. Will give a shot.
    
    BTW, while we are here, I was thinking of adding a more generalized version of an util like `arrow_fallback` to reduce configuration specific codes in the test scope but was hesitant because this approach is new to PySpark. WDTY? I will do another PR for this cleanup if we all feel in the same way. Cc @ueshin too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1278/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87696/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r171155732
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -1518,7 +1525,9 @@ class SQLConf extends Serializable with Logging {
     
       def rangeExchangeSampleSizePerPartition: Int = getConf(RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION)
     
    -  def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLE)
    +  def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLED)
    +
    +  def arrowFallbackEnable: Boolean = getConf(ARROW_FALLBACK_ENABLED)
    --- End diff --
    
    nit: Have we used this `arrowFallbackEnable` definition?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #88007 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88007/testReport)** for PR 20678 at commit [`b5bea82`](https://github.com/apache/spark/commit/b5bea82a9b84a3478b634052aa2662a44311a512).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r172751054
  
    --- Diff: docs/sql-programming-guide.md ---
    @@ -1689,6 +1689,10 @@ using the call `toPandas()` and when creating a Spark DataFrame from a Pandas Da
     `createDataFrame(pandas_df)`. To use Arrow when executing these calls, users need to first set
     the Spark configuration 'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by default.
     
    +In addition, optimizations enabled by 'spark.sql.execution.arrow.enabled' could fallback automatically
    +to non-optimized implementations if an error occurs before the actual computation within Spark.
    --- End diff --
    
    very minor nit: `non-optimized implementations` --> `non-Arrow optimization implementation`
    
    this matches the description in the paragraph below


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170799255
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -1986,55 +1986,89 @@ def toPandas(self):
                 timezone = None
     
             if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
    +            should_fallback = False
                 try:
    -                from pyspark.sql.types import _check_dataframe_convert_date, \
    -                    _check_dataframe_localize_timestamps, to_arrow_schema
    +                from pyspark.sql.types import to_arrow_schema
                     from pyspark.sql.utils import require_minimum_pyarrow_version
    +
                     require_minimum_pyarrow_version()
    -                import pyarrow
                     to_arrow_schema(self.schema)
    -                tables = self._collectAsArrow()
    -                if tables:
    -                    table = pyarrow.concat_tables(tables)
    -                    pdf = table.to_pandas()
    -                    pdf = _check_dataframe_convert_date(pdf, self.schema)
    -                    return _check_dataframe_localize_timestamps(pdf, timezone)
    -                else:
    -                    return pd.DataFrame.from_records([], columns=self.columns)
                 except Exception as e:
    -                msg = (
    -                    "Note: toPandas attempted Arrow optimization because "
    -                    "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
    -                    "to disable this.")
    -                raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
    -        else:
    -            pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
     
    -            dtype = {}
    +                if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "false") \
    +                        .lower() == "true":
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n"
    +                        "  %s\n"
    --- End diff --
    
    No problem at all.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r171139748
  
    --- Diff: docs/sql-programming-guide.md ---
    @@ -1689,6 +1689,10 @@ using the call `toPandas()` and when creating a Spark DataFrame from a Pandas Da
     `createDataFrame(pandas_df)`. To use Arrow when executing these calls, users need to first set
     the Spark configuration 'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by default.
     
    +In addition, optimizations enabled by 'spark.sql.execution.arrow.enabled' will fallback automatically
    +to non-optimized implementations if an error occurs. This can be controlled by
    --- End diff --
    
    Let me try to rephrase this doc a bit. The point I was trying to make in this fallback (for now) was, to only do the fallback before the actual distributed computation within Spark.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88065/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88007/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #87674 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87674/testReport)** for PR 20678 at commit [`7f87d25`](https://github.com/apache/spark/commit/7f87d2537488ca03c926f4d9c6318451c688ebe5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #87674 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87674/testReport)** for PR 20678 at commit [`7f87d25`](https://github.com/apache/spark/commit/7f87d2537488ca03c926f4d9c6318451c688ebe5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #88065 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88065/testReport)** for PR 20678 at commit [`4ccaa81`](https://github.com/apache/spark/commit/4ccaa81038e9e87b9772394f5b39866b65c798d8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r171155800
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -1518,7 +1525,9 @@ class SQLConf extends Serializable with Logging {
     
       def rangeExchangeSampleSizePerPartition: Int = getConf(RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION)
     
    -  def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLE)
    +  def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLED)
    --- End diff --
    
    Actually seems we don't use `arrowEnable` too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170813278
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3493,19 +3514,30 @@ def create_pandas_data_frame(self):
             data_dict["4_float_t"] = np.float32(data_dict["4_float_t"])
             return pd.DataFrame(data=data_dict)
     
    -    def test_unsupported_datatype(self):
    -        schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    -        df = self.spark.createDataFrame([(None,)], schema=schema)
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(Exception, 'Unsupported type'):
    -                df.toPandas()
    +    def test_toPandas_fallback_enabled(self):
    +        import pandas as pd
     
    -        df = self.spark.createDataFrame([(None,)], schema="a binary")
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(
    -                    Exception,
    -                    'Unsupported type.*\nNote: toPandas attempted Arrow optimization because'):
    -                df.toPandas()
    +        with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", True):
    +            schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    +            df = self.spark.createDataFrame([({u'a': 1},)], schema=schema)
    +            with QuietTest(self.sc):
    +                with warnings.catch_warnings(record=True) as warns:
    +                    pdf = df.toPandas()
    +                    # Catch and check the last UserWarning.
    +                    user_warns = [
    +                        warn.message for warn in warns if isinstance(warn.message, UserWarning)]
    +                    self.assertTrue(len(user_warns) > 0)
    +                    self.assertTrue(
    +                        "Attempts non-optimization" in _exception_message(user_warns[-1]))
    +                    self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]}))
    +
    +    def test_toPandas_fallback_disabled(self):
    +        with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", False):
    --- End diff --
    
    Seems good, but how about using `dict` for setting multiple configs at the same time?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170808505
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3493,19 +3514,30 @@ def create_pandas_data_frame(self):
             data_dict["4_float_t"] = np.float32(data_dict["4_float_t"])
             return pd.DataFrame(data=data_dict)
     
    -    def test_unsupported_datatype(self):
    -        schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    -        df = self.spark.createDataFrame([(None,)], schema=schema)
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(Exception, 'Unsupported type'):
    -                df.toPandas()
    +    def test_toPandas_fallback_enabled(self):
    +        import pandas as pd
     
    -        df = self.spark.createDataFrame([(None,)], schema="a binary")
    -        with QuietTest(self.sc):
    -            with self.assertRaisesRegexp(
    -                    Exception,
    -                    'Unsupported type.*\nNote: toPandas attempted Arrow optimization because'):
    -                df.toPandas()
    +        with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", True):
    +            schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
    +            df = self.spark.createDataFrame([({u'a': 1},)], schema=schema)
    +            with QuietTest(self.sc):
    +                with warnings.catch_warnings(record=True) as warns:
    +                    pdf = df.toPandas()
    +                    # Catch and check the last UserWarning.
    +                    user_warns = [
    +                        warn.message for warn in warns if isinstance(warn.message, UserWarning)]
    +                    self.assertTrue(len(user_warns) > 0)
    +                    self.assertTrue(
    +                        "Attempts non-optimization" in _exception_message(user_warns[-1]))
    +                    self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]}))
    +
    +    def test_toPandas_fallback_disabled(self):
    +        with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", False):
    --- End diff --
    
    Hey @ueshin and @BryanCutler, do you guys like this idea?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170813132
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -1068,6 +1068,13 @@ object SQLConf {
           .booleanConf
           .createWithDefault(false)
     
    +  val ARROW_FALLBACK_ENABLE =
    --- End diff --
    
    `ARROW_FALLBACK_ENABLED` instead of `ARROW_FALLBACK_ENABLE`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #87673 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87673/testReport)** for PR 20678 at commit [`ff9d38b`](https://github.com/apache/spark/commit/ff9d38b691bd54c073db3b55983564cfbb0d903e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #87696 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87696/testReport)** for PR 20678 at commit [`cfb08a1`](https://github.com/apache/spark/commit/cfb08a1d9b4fdea5a06605f53db90e1a7408be85).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #88016 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88016/testReport)** for PR 20678 at commit [`b5bea82`](https://github.com/apache/spark/commit/b5bea82a9b84a3478b634052aa2662a44311a512).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1104/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r170792766
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -666,8 +666,28 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
                     try:
                         return self._create_from_pandas_with_arrow(data, schema, timezone)
                     except Exception as e:
    -                    warnings.warn("Arrow will not be used in createDataFrame: %s" % str(e))
    -                    # Fallback to create DataFrame without arrow if raise some exception
    +                    from pyspark.util import _exception_message
    +
    +                    if self.conf.get("spark.sql.execution.arrow.fallback.enabled", "false") \
    --- End diff --
    
    ditto.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallb...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20678#discussion_r171110887
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -1986,55 +1986,89 @@ def toPandas(self):
                 timezone = None
     
             if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
    +            should_fallback = False
                 try:
    -                from pyspark.sql.types import _check_dataframe_convert_date, \
    -                    _check_dataframe_localize_timestamps, to_arrow_schema
    +                from pyspark.sql.types import to_arrow_schema
                     from pyspark.sql.utils import require_minimum_pyarrow_version
    +
                     require_minimum_pyarrow_version()
    -                import pyarrow
                     to_arrow_schema(self.schema)
    -                tables = self._collectAsArrow()
    -                if tables:
    -                    table = pyarrow.concat_tables(tables)
    -                    pdf = table.to_pandas()
    -                    pdf = _check_dataframe_convert_date(pdf, self.schema)
    -                    return _check_dataframe_localize_timestamps(pdf, timezone)
    -                else:
    -                    return pd.DataFrame.from_records([], columns=self.columns)
                 except Exception as e:
    -                msg = (
    -                    "Note: toPandas attempted Arrow optimization because "
    -                    "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
    -                    "to disable this.")
    -                raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
    -        else:
    -            pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
     
    -            dtype = {}
    +                if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "true") \
    +                        .lower() == "true":
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n  %s\n"
    +                        "Attempts non-optimization as "
    +                        "'spark.sql.execution.arrow.fallback.enabled' is set to "
    +                        "true." % _exception_message(e))
    +                    warnings.warn(msg)
    +                    should_fallback = True
    +                else:
    +                    msg = (
    +                        "toPandas attempted Arrow optimization because "
    +                        "'spark.sql.execution.arrow.enabled' is set to true; however, "
    +                        "failed by the reason below:\n  %s\n"
    +                        "For fallback to non-optimization automatically, please set true to "
    +                        "'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e))
    +                    raise RuntimeError(msg)
    +
    +            if not should_fallback:
    --- End diff --
    
    So if I'm tracing the logic correctly, if arrow optimizations are enabled and there is an exception parsing the schema and we don't have fall back enabled we go down this code path or if we don't have arrow enabled we also go down this code path? It might make sense to add a comment here with what the intended times to go down this path are?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20678: [SPARK-23380][PYTHON] Adds a conf for Arrow fallback in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20678
  
    **[Test build #87696 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87696/testReport)** for PR 20678 at commit [`cfb08a1`](https://github.com/apache/spark/commit/cfb08a1d9b4fdea5a06605f53db90e1a7408be85).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org