You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by BryanCutler <gi...@git.apache.org> on 2017/10/09 19:34:49 UTC

[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

GitHub user BryanCutler opened a pull request:

    https://github.com/apache/spark/pull/19459

    [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFrame from Pandas

    ## What changes were proposed in this pull request?
    
    This change uses Arrow to optimize the creation of a Spark DataFrame from a Pandas DataFrame. The input df is sliced according to the default parallelism. The optimization is enabled with the existing conf "spark.sql.execution.arrow.enable" and is disabled by default.
    
    ## How was this patch tested?
    
    Added new unit test to create DataFrame with and without the optimization enabled, then compare results.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/BryanCutler/spark arrow-createDataFrame-from_pandas-SPARK-20791

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19459.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19459
    
----

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r143633890
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enable", "false").lower() == "true" \
    --- End diff --
    
    The config name was modified to `spark.sql.execution.arrow.enabled` at d29d1e87995e02cb57ba3026c945c3cd66bb06e2 and af8a34c787dc3d68f5148a7d9975b52650bb7729.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83233 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83233/testReport)** for PR 19459 at commit [`cfb1c3d`](https://github.com/apache/spark/commit/cfb1c3dd48abc7073cf0f98e529afae4e1157d78).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145862488
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,73 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, _cast_pandas_series_type
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        if schema is None or isinstance(schema, list):
    +            batches = [pa.RecordBatch.from_pandas(pdf_slice, preserve_index=False)
    +                       for pdf_slice in pdf_slices]
    +
    +            # There will be at least 1 batch after slicing the pandas.DataFrame
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +
    +            # If passed schema as a list of names then rename fields
    +            if isinstance(schema, list):
    +                fields = []
    +                for i, field in enumerate(schema_from_arrow):
    +                    field.name = schema[i]
    +                    fields.append(field)
    +                schema = StructType(fields)
    +            else:
    +                schema = schema_from_arrow
    +        else:
    +            batches = []
    +            for i, pdf_slice in enumerate(pdf_slices):
    +
    +                # convert to series to pyarrow.Arrays to use mask when creating Arrow batches
    +                arrs = []
    +                names = []
    +                for c, (_, series) in enumerate(pdf_slice.iteritems()):
    +                    field = schema[c]
    +                    names.append(field.name)
    +                    t = to_arrow_type(field.dataType)
    +                    try:
    +                        # NOTE: casting is not necessary with Arrow >= 0.7
    +                        arrs.append(pa.Array.from_pandas(_cast_pandas_series_type(series, t),
    --- End diff --
    
    Any chance that the given data type for the field is not correct? By a wrong data type of a field, what the behavior of this casting?
    
    For example, for a Series of numpy.int16`, if the given data type is bytetype, this casting will turn it to numpy.int8, so we will get:
    ```python
    >>> s = pd.Series([1, 2, 10001], dtype=np.int16)                                                                   
    >>> s
    0        1
    1        2
    2    10001
    dtype: int16
    >>> s.astype(np.int8)
    0     1
    1     2
    2    17
    dtype: int8
    ```
    
    `createDataFrame` will check the data type of input data when converting to DataFrame. This implicit type casting seems inconsistent with original behavior.
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145200454
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +        try:
    +            serializer = ArrowSerializer()
    +            serializer.dump_stream(batches, tempFile)
    +            tempFile.close()
    +            readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +            jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +        finally:
    +            # readRDDFromFile eagerily reads the file so we can delete right after.
    +            os.unlink(tempFile.name)
    +
    +        # Create the Spark DataFrame, there will be at least 1 batch
    +        schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    Oh pyarrow 0.4.1 is what is installed on Jenkins, so that is what I've been testing against. Maybe try that version?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144194374
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                from pyspark.serializers import ArrowSerializer
    --- End diff --
    
    Maybe we should split this block to a method like `_createFromPandasDataFrame` as the same as the other create methods?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83001 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83001/testReport)** for PR 19459 at commit [`f421e2d`](https://github.com/apache/spark/commit/f421e2da1e97dfbc7c80b7ae724b6ea9a472b220).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144930424
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    --- End diff --
    
    Yeah, it is - I didn't want to mess around with the `parallelize()` logic so I left it alone.  If we were to make a common function it would look like this
    
    ```python
    
    def _dump_to_tempfile(data, serializer):
        tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir)
        try:
            serializer.dump_stream(c, tempFile)
            tempFile.close()
            readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
            return readRDDFromFile(self._jsc, tempFile.name, numSlices)
        finally:
            # readRDDFromFile eagerily reads the file so we can delete right after.
            os.unlink(tempFile.name)
    ```
    
    and some changes to `parallelize` to call it
    ```python
    # Make sure we distribute data evenly if it's smaller than self.batchSize
    if "__len__" not in dir(c):
        c = list(c)    # Make it a list so we can compute its length
    batchSize = max(1, min(len(c) // numSlices, self._batchSize or 1024))
    serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
    jrdd = _dump_to_tempfile(c, serializer)
    ```
    
    Let me know if you all think we should change this?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82573/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83579 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83579/testReport)** for PR 19459 at commit [`99ce1e4`](https://github.com/apache/spark/commit/99ce1e44f57c411af95b1c9d9c95f35f2c1652e1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144194084
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                from pyspark.serializers import ArrowSerializer
    +                from pyspark.sql.types import from_arrow_schema
    +                import pyarrow as pa
    +
    +                # Slice the DataFrame into batches
    +                split = -(-len(data) // self.sparkContext.defaultParallelism)  # round int up
    +                slices = (data[i:i + split] for i in xrange(0, len(data), split))
    +                batches = [pa.RecordBatch.from_pandas(sliced_df, preserve_index=False)
    +                           for sliced_df in slices]
    +
    +                # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +                import os
    +                from tempfile import NamedTemporaryFile
    +                tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +                try:
    +                    serializer = ArrowSerializer()
    +                    serializer.dump_stream(batches, tempFile)
    +                    tempFile.close()
    +                    readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +                    jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +                finally:
    +                    # readRDDFromFile eagerily reads the file so we can delete right after.
    +                    os.unlink(tempFile.name)
    +
    +                # Create the Spark DataFrame, there will be at least 1 batch
    +                schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    What if a user specify the schema?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83233 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83233/testReport)** for PR 19459 at commit [`cfb1c3d`](https://github.com/apache/spark/commit/cfb1c3dd48abc7073cf0f98e529afae4e1157d78).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145291702
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,39 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(pdf_slice, schema=arrow_schema, preserve_index=False)
    +                   for pdf_slice in pdf_slices]
    +
    +        # Verify schema, there will be at least 1 batch from pandas.DataFrame
    +        schema_from_arrow = from_arrow_schema(batches[0].schema)
    +        if schema is not None and schema != schema_from_arrow:
    +            raise ValueError("Supplied schema does not match result from Arrow\nsupplied: " +
    +                             "%s\n!=\nfrom Arrow: %s" % (str(schema), str(schema_from_arrow)))
    --- End diff --
    
    @ueshin and @HyukjinKwon after thinking about what to do when the schema is not equal, I have some concerns:
    
    1. Fallback to `createDataFrame` without Arrow - I implemented this and works fine, but there is no logging in python (afaik) so my concern is that it does this silently and causes bad performance and the user will not know why.
    
    2. Cast types using `astype` similar to `ArrowPandasSerializer.dump_stream` - The issue I see with that is if there are null values and ints have been promoted to floats, this works fine in `dump_stream` because we are working with pd.Series and  pyarrow allows us to pass a validity mask, which ignores the filled values.  There aren't options to pass in masks for pd.DataFrames, so I believe it will try to interpret whatever fill values are there and cause an error.  I can look into this more though.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150618779
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -438,28 +438,70 @@ def _get_numpy_record_dtypes(self, rec):
                     curr_type = 'datetime64[us]'
                     has_rec_fix = True
                 record_type_list.append((str(col_names[i]), curr_type))
    -        return record_type_list if has_rec_fix else None
    +        return np.dtype(record_type_list) if has_rec_fix else None
     
    -    def _convert_from_pandas(self, pdf, schema):
    +    def _convert_from_pandas(self, pdf):
             """
              Convert a pandas.DataFrame to list of records that can be used to make a DataFrame
    -         :return tuple of list of records and schema
    +         :return list of records
             """
    -        # If no schema supplied by user then get the names of columns only
    -        if schema is None:
    -            schema = [str(x) for x in pdf.columns]
     
             # Convert pandas.DataFrame to list of numpy records
             np_records = pdf.to_records(index=False)
     
             # Check if any columns need to be fixed for Spark to infer properly
             if len(np_records) > 0:
    -            record_type_list = self._get_numpy_record_dtypes(np_records[0])
    -            if record_type_list is not None:
    -                return [r.astype(record_type_list).tolist() for r in np_records], schema
    +            record_dtype = self._get_numpy_record_dtype(np_records[0])
    +            if record_dtype is not None:
    +                return [r.astype(record_dtype).tolist() for r in np_records]
     
             # Convert list of numpy records to python lists
    -        return [r.tolist() for r in np_records], schema
    +        return [r.tolist() for r in np_records]
    +
    +    def _create_from_pandas_with_arrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer, _create_batch
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType
    +        from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
    +
    +        # Determine arrow types to coerce data when creating batches
    +        if isinstance(schema, StructType):
    +            arrow_types = [to_arrow_type(f.dataType) for f in schema.fields]
    +        elif isinstance(schema, DataType):
    +            raise ValueError("Single data type %s is not supported with Arrow" % str(schema))
    +        else:
    +            # Any timestamps must be coerced to be compatible with Spark
    +            arrow_types = [to_arrow_type(TimestampType())
    +                           if is_datetime64_dtype(t) or is_datetime64tz_dtype(t) else None
    +                           for t in pdf.dtypes]
    +
    +        # Slice the DataFrame to be batched
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        # Create Arrow record batches
    +        batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)])
    +                   for pdf_slice in pdf_slices]
    +
    +        # Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing)
    +        if isinstance(schema, (list, tuple)):
    +            struct = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    Sure, will do


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r149887860
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -214,6 +214,14 @@ def __repr__(self):
     
     
     def _create_batch(series):
    +    """
    +    Create an Arrow record batch from the given pandas.Series or list of Series, with optional type.
    +
    +    :param series: A single pandas.Series, list of Series, or list of (series, arrow_type)
    +    :param copy: Option to make a copy of the series before performing any type casts
    --- End diff --
    
    We can remove this description.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83579/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    BTW, https://github.com/apache/spark/pull/19459#discussion_r145034007 looks missed :).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145292822
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,39 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(pdf_slice, schema=arrow_schema, preserve_index=False)
    +                   for pdf_slice in pdf_slices]
    +
    +        # Verify schema, there will be at least 1 batch from pandas.DataFrame
    +        schema_from_arrow = from_arrow_schema(batches[0].schema)
    +        if schema is not None and schema != schema_from_arrow:
    +            raise ValueError("Supplied schema does not match result from Arrow\nsupplied: " +
    +                             "%s\n!=\nfrom Arrow: %s" % (str(schema), str(schema_from_arrow)))
    --- End diff --
    
    Oh, actually we do have for warning in Python:
    
    ```python
    import warnings
    warnings.warn("...")
    ```



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #82601 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82601/testReport)** for PR 19459 at commit [`c7ddee6`](https://github.com/apache/spark/commit/c7ddee6b7ab91c1651a397a716ed91ed2a8383a3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83001 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83001/testReport)** for PR 19459 at commit [`f421e2d`](https://github.com/apache/spark/commit/f421e2da1e97dfbc7c80b7ae724b6ea9a472b220).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144348462
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                from pyspark.serializers import ArrowSerializer
    +                from pyspark.sql.types import from_arrow_schema
    +                import pyarrow as pa
    +
    +                # Slice the DataFrame into batches
    +                split = -(-len(data) // self.sparkContext.defaultParallelism)  # round int up
    +                slices = (data[i:i + split] for i in xrange(0, len(data), split))
    +                batches = [pa.RecordBatch.from_pandas(sliced_df, preserve_index=False)
    +                           for sliced_df in slices]
    +
    +                # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +                import os
    +                from tempfile import NamedTemporaryFile
    +                tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +                try:
    +                    serializer = ArrowSerializer()
    +                    serializer.dump_stream(batches, tempFile)
    +                    tempFile.close()
    +                    readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +                    jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +                finally:
    +                    # readRDDFromFile eagerily reads the file so we can delete right after.
    +                    os.unlink(tempFile.name)
    +
    +                # Create the Spark DataFrame, there will be at least 1 batch
    +                schema = from_arrow_schema(batches[0].schema)
    +                jdf = self._jvm.PythonSQLUtils.arrowPayloadToDataFrame(
    +                    jrdd, schema.json(), self._wrapped._jsqlContext)
    +                df = DataFrame(jdf, self._wrapped)
    +                df._schema = schema
    --- End diff --
    
    I'd leave some comments here about why `_schema` should be manually set rather than using it's own schema. Actually, mind if I ask why should we set this manually?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144340072
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                from pyspark.serializers import ArrowSerializer
    +                from pyspark.sql.types import from_arrow_schema
    +                import pyarrow as pa
    +
    +                # Slice the DataFrame into batches
    +                split = -(-len(data) // self.sparkContext.defaultParallelism)  # round int up
    +                slices = (data[i:i + split] for i in xrange(0, len(data), split))
    +                batches = [pa.RecordBatch.from_pandas(sliced_df, preserve_index=False)
    +                           for sliced_df in slices]
    +
    +                # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +                import os
    +                from tempfile import NamedTemporaryFile
    --- End diff --
    
    I'd put those imports above with `import pyarrow as pa` or top of this file ... 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144827985
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    --- End diff --
    
    typo: `parallelsize` -> `parallelize`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144829187
  
    --- Diff: python/pyspark/sql/types.py ---
    @@ -1624,6 +1624,50 @@ def to_arrow_type(dt):
         return arrow_type
     
     
    +def to_arrow_schema(schema):
    +    """ Convert a schema from Spark to Arrow
    +    """
    +    import pyarrow as pa
    +    fields = [pa.field(field.name, to_arrow_type(field.dataType)) for field in schema]
    --- End diff --
    
    We should add `nullable=field.nullable` just in case?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r143906469
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3095,16 +3095,32 @@ def setUpClass(cls):
                 StructField("3_long_t", LongType(), True),
                 StructField("4_float_t", FloatType(), True),
                 StructField("5_double_t", DoubleType(), True)])
    -        cls.data = [("a", 1, 10, 0.2, 2.0),
    -                    ("b", 2, 20, 0.4, 4.0),
    -                    ("c", 3, 30, 0.8, 6.0)]
    +        cls.data = [(u"a", 1, 10, 0.2, 2.0),
    +                    (u"b", 2, 20, 0.4, 4.0),
    +                    (u"c", 3, 30, 0.8, 6.0)]
    +
    +    @classmethod
    +    def tearDownClass(cls):
    +        ReusedPySparkTestCase.tearDownClass()
    +        cls.spark.stop()
     
         def assertFramesEqual(self, df_with_arrow, df_without):
             msg = ("DataFrame from Arrow is not equal" +
                    ("\n\nWith Arrow:\n%s\n%s" % (df_with_arrow, df_with_arrow.dtypes)) +
                    ("\n\nWithout:\n%s\n%s" % (df_without, df_without.dtypes)))
             self.assertTrue(df_without.equals(df_with_arrow), msg=msg)
     
    +    def createPandasDataFrameFromeData(self):
    --- End diff --
    
    nit: typo `createPandasDataFrameFromeData` -> `createPandasDataFrameFromData`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r149760058
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -213,7 +213,15 @@ def __repr__(self):
             return "ArrowSerializer"
     
     
    -def _create_batch(series):
    +def _create_batch(series, copy=False):
    --- End diff --
    
    Right, I forgot that `fillna` returns a copy.  Do you think it would be worth it to first check for any nulls and only `fillna` if needed?  The mask of nulls is already created so just need to add a function like this in `_create_batch`:
    
    ```
    def fill_series_nulls(s, mask):
        return s.fillna(0) if mask.any() else s
    ```
    What do you think @ueshin ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #82560 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82560/testReport)** for PR 19459 at commit [`06b033f`](https://github.com/apache/spark/commit/06b033f9e6461b5f7394a7d19896a0f614de6791).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145858362
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,73 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, _cast_pandas_series_type
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        if schema is None or isinstance(schema, list):
    +            batches = [pa.RecordBatch.from_pandas(pdf_slice, preserve_index=False)
    +                       for pdf_slice in pdf_slices]
    +
    +            # There will be at least 1 batch after slicing the pandas.DataFrame
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +
    +            # If passed schema as a list of names then rename fields
    +            if isinstance(schema, list):
    +                fields = []
    +                for i, field in enumerate(schema_from_arrow):
    +                    field.name = schema[i]
    +                    fields.append(field)
    +                schema = StructType(fields)
    +            else:
    +                schema = schema_from_arrow
    +        else:
    +            batches = []
    +            for i, pdf_slice in enumerate(pdf_slices):
    +
    +                # convert to series to pyarrow.Arrays to use mask when creating Arrow batches
    +                arrs = []
    +                names = []
    +                for c, (_, series) in enumerate(pdf_slice.iteritems()):
    +                    field = schema[c]
    +                    names.append(field.name)
    +                    t = to_arrow_type(field.dataType)
    +                    try:
    +                        # NOTE: casting is not necessary with Arrow >= 0.7
    +                        arrs.append(pa.Array.from_pandas(_cast_pandas_series_type(series, t),
    +                                                         mask=series.isnull(), type=t))
    +                    except ValueError as e:
    +                        warnings.warn("Arrow will not be used in createDataFrame: %s" % str(e))
    +                        return None
    +                batches.append(pa.RecordBatch.from_arrays(arrs, names))
    +
    +                # Verify schema of first batch, return None if not equal and fallback without Arrow
    +                if i == 0:
    +                    schema_from_arrow = from_arrow_schema(batches[i].schema)
    +                    if schema != schema_from_arrow:
    +                        warnings.warn("Arrow will not be used in createDataFrame.\n" +
    --- End diff --
    
    OK by me too but let's keep on our eyes on mailing list and JIRAs that complains about it in the future, and improve it next time if this sounds more important than we think here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r143605840
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3147,6 +3150,14 @@ def test_filtered_frame(self):
             self.assertEqual(pdf.columns[0], "i")
             self.assertTrue(pdf.empty)
     
    +    def test_createDataFrame_toggle(self):
    +        pdf = self.createPandasDataFrameFromeData()
    +        self.spark.conf.set("spark.sql.execution.arrow.enable", "false")
    +        df_no_arrow = self.spark.createDataFrame(pdf)
    +        self.spark.conf.set("spark.sql.execution.arrow.enable", "true")
    --- End diff --
    
    done. I guess this would make the failure easier to see?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82559/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145032365
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +        try:
    +            serializer = ArrowSerializer()
    +            serializer.dump_stream(batches, tempFile)
    +            tempFile.close()
    +            readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +            jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +        finally:
    +            # readRDDFromFile eagerily reads the file so we can delete right after.
    +            os.unlink(tempFile.name)
    +
    +        # Create the Spark DataFrame, there will be at least 1 batch
    +        schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    Can we adjust data types if `schema is not None` like we did at `ArrowStreamPandasSerializer.dump_stream`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged to master.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145293860
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,39 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(pdf_slice, schema=arrow_schema, preserve_index=False)
    +                   for pdf_slice in pdf_slices]
    --- End diff --
    
    However, if we go 1. way, I think we should avoid creating whole batches first. I think falling back might make sense if its cost is cheap.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145035994
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +        try:
    +            serializer = ArrowSerializer()
    +            serializer.dump_stream(batches, tempFile)
    +            tempFile.close()
    +            readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +            jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +        finally:
    +            # readRDDFromFile eagerily reads the file so we can delete right after.
    +            os.unlink(tempFile.name)
    +
    +        # Create the Spark DataFrame, there will be at least 1 batch
    +        schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    Sorry @HyukjinKwon , I misread your example.  Yes, we can fall back to not use arrow if the user supplied schema doesn't match from arrow - I actually get equal results from your example tho
    
    > can we adjust data types if schema is not None
    
    @ueshin we could do this too, I'm not sure what would be better in this case.. I'll think about it for a bit


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r143606693
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3147,6 +3150,14 @@ def test_filtered_frame(self):
             self.assertEqual(pdf.columns[0], "i")
             self.assertTrue(pdf.empty)
     
    +    def test_createDataFrame_toggle(self):
    +        pdf = self.createPandasDataFrameFromeData()
    +        self.spark.conf.set("spark.sql.execution.arrow.enable", "false")
    +        df_no_arrow = self.spark.createDataFrame(pdf)
    +        self.spark.conf.set("spark.sql.execution.arrow.enable", "true")
    --- End diff --
    
    Hmmm, I thought the `tearDownClass` was there but it's actually in #18664.  Maybe I should put it in here since that needs some more discussion.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145336060
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,39 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(pdf_slice, schema=arrow_schema, preserve_index=False)
    +                   for pdf_slice in pdf_slices]
    +
    +        # Verify schema, there will be at least 1 batch from pandas.DataFrame
    +        schema_from_arrow = from_arrow_schema(batches[0].schema)
    +        if schema is not None and schema != schema_from_arrow:
    +            raise ValueError("Supplied schema does not match result from Arrow\nsupplied: " +
    +                             "%s\n!=\nfrom Arrow: %s" % (str(schema), str(schema_from_arrow)))
    --- End diff --
    
    Sure, I couldn't agree more with ^.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145859471
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,6 +578,12 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                df = self._createFromPandasWithArrow(data, schema)
    +                # Fallback to create DataFrame without arrow if return None
    +                if df is not None:
    --- End diff --
    
    Shall we show some log message to users in this case?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144828405
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +        try:
    +            serializer = ArrowSerializer()
    +            serializer.dump_stream(batches, tempFile)
    +            tempFile.close()
    +            readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +            jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +        finally:
    +            # readRDDFromFile eagerily reads the file so we can delete right after.
    +            os.unlink(tempFile.name)
    +
    +        # Create the Spark DataFrame, there will be at least 1 batch
    +        schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    `schema = from_arrow_schema(batches[0].schema) if schema is None else schema`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150305976
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -225,11 +232,11 @@ def _create_batch(series):
         # If a nullable integer series has been promoted to floating point with NaNs, need to cast
         # NOTE: this is not necessary with Arrow >= 0.7
         def cast_series(s, t):
    -        if type(t) == pa.TimestampType:
    +        if t is not None and type(t) == pa.TimestampType:
    --- End diff --
    
    This doesn't seem to be needed anymore.  It came from an error when comparing pyarrow type instances to None.
    ```
    >>> import pyarrow as pa
    >>> type(None) == pa.TimestampType
    False
    >>> None == pa.date32()
    Segmentation fault
    ```
    So this check is still needed right below when we check for date32().  I can't remember if this was fixed in current versions of pyarrow, but I'll add a note here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #82560 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82560/testReport)** for PR 19459 at commit [`06b033f`](https://github.com/apache/spark/commit/06b033f9e6461b5f7394a7d19896a0f614de6791).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145611544
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,73 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, _cast_pandas_series_type
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        if schema is None or isinstance(schema, list):
    +            batches = [pa.RecordBatch.from_pandas(pdf_slice, preserve_index=False)
    +                       for pdf_slice in pdf_slices]
    +
    +            # There will be at least 1 batch after slicing the pandas.DataFrame
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +
    +            # If passed schema as a list of names then rename fields
    +            if isinstance(schema, list):
    +                fields = []
    +                for i, field in enumerate(schema_from_arrow):
    +                    field.name = schema[i]
    +                    fields.append(field)
    +                schema = StructType(fields)
    +            else:
    +                schema = schema_from_arrow
    +        else:
    +            batches = []
    +            for i, pdf_slice in enumerate(pdf_slices):
    +
    +                # convert to series to pyarrow.Arrays to use mask when creating Arrow batches
    +                arrs = []
    +                names = []
    +                for c, (_, series) in enumerate(pdf_slice.iteritems()):
    +                    field = schema[c]
    +                    names.append(field.name)
    +                    t = to_arrow_type(field.dataType)
    +                    try:
    +                        # NOTE: casting is not necessary with Arrow >= 0.7
    +                        arrs.append(pa.Array.from_pandas(_cast_pandas_series_type(series, t),
    +                                                         mask=series.isnull(), type=t))
    +                    except ValueError as e:
    +                        warnings.warn("Arrow will not be used in createDataFrame: %s" % str(e))
    +                        return None
    +                batches.append(pa.RecordBatch.from_arrays(arrs, names))
    +
    +                # Verify schema of first batch, return None if not equal and fallback without Arrow
    +                if i == 0:
    +                    schema_from_arrow = from_arrow_schema(batches[i].schema)
    +                    if schema != schema_from_arrow:
    +                        warnings.warn("Arrow will not be used in createDataFrame.\n" +
    --- End diff --
    
    If we won't reach the block, I think we can simplify `_createFromPandasWithArrow()` like https://github.com/BryanCutler/spark/pull/28.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145293209
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,39 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(pdf_slice, schema=arrow_schema, preserve_index=False)
    +                   for pdf_slice in pdf_slices]
    +
    +        # Verify schema, there will be at least 1 batch from pandas.DataFrame
    +        schema_from_arrow = from_arrow_schema(batches[0].schema)
    +        if schema is not None and schema != schema_from_arrow:
    +            raise ValueError("Supplied schema does not match result from Arrow\nsupplied: " +
    +                             "%s\n!=\nfrom Arrow: %s" % (str(schema), str(schema_from_arrow)))
    --- End diff --
    
    I'd prefer 1. if anything becomes too complicated to match the results for now. I guess 2. could be failed for any unexpected reason comparing `createDataFrame` without Arrow so I thought 1. is required as a guarantee and 2. is good to do. I wonder what @ueshin thinks about this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r143607522
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---
    @@ -203,4 +205,16 @@ private[sql] object ArrowConverters {
           reader.close()
         }
       }
    +
    +  def toDataFrame(
    --- End diff --
    
    I had to make this public to be callable with py4j.  Alternatively, something could be added to `o.a.s.sql.api.python.PythonSQLUtils`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83635/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r146421298
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,73 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, _cast_pandas_series_type
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        if schema is None or isinstance(schema, list):
    +            batches = [pa.RecordBatch.from_pandas(pdf_slice, preserve_index=False)
    +                       for pdf_slice in pdf_slices]
    +
    +            # There will be at least 1 batch after slicing the pandas.DataFrame
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +
    +            # If passed schema as a list of names then rename fields
    +            if isinstance(schema, list):
    +                fields = []
    +                for i, field in enumerate(schema_from_arrow):
    +                    field.name = schema[i]
    +                    fields.append(field)
    +                schema = StructType(fields)
    +            else:
    +                schema = schema_from_arrow
    +        else:
    +            batches = []
    +            for i, pdf_slice in enumerate(pdf_slices):
    +
    +                # convert to series to pyarrow.Arrays to use mask when creating Arrow batches
    +                arrs = []
    +                names = []
    +                for c, (_, series) in enumerate(pdf_slice.iteritems()):
    +                    field = schema[c]
    +                    names.append(field.name)
    +                    t = to_arrow_type(field.dataType)
    +                    try:
    +                        # NOTE: casting is not necessary with Arrow >= 0.7
    +                        arrs.append(pa.Array.from_pandas(_cast_pandas_series_type(series, t),
    +                                                         mask=series.isnull(), type=t))
    +                    except ValueError as e:
    +                        warnings.warn("Arrow will not be used in createDataFrame: %s" % str(e))
    +                        return None
    +                batches.append(pa.RecordBatch.from_arrays(arrs, names))
    +
    +                # Verify schema of first batch, return None if not equal and fallback without Arrow
    +                if i == 0:
    +                    schema_from_arrow = from_arrow_schema(batches[i].schema)
    +                    if schema != schema_from_arrow:
    +                        warnings.warn("Arrow will not be used in createDataFrame.\n" +
    --- End diff --
    
    ok, merging now


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r146436616
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,52 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer, _create_batch
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        if schema is None or isinstance(schema, list):
    +            batches = [pa.RecordBatch.from_pandas(pdf_slice, preserve_index=False)
    +                       for pdf_slice in pdf_slices]
    +
    +            # There will be at least 1 batch after slicing the pandas.DataFrame
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +
    +            # If passed schema as a list of names then rename fields
    +            if isinstance(schema, list):
    +                fields = []
    +                for i, field in enumerate(schema_from_arrow):
    +                    field.name = schema[i]
    +                    fields.append(field)
    +                schema = StructType(fields)
    +            else:
    +                schema = schema_from_arrow
    +        else:
    +            if not isinstance(schema, StructType) and isinstance(schema, DataType):
    +                schema = StructType().add("value", schema)
    --- End diff --
    
    BTW, I think we should not support this case:
    
    ```python
    >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    >>> spark.createDataFrame(pd.DataFrame([{"a": 1}]), "int").show()
    ```
    
    ```
    +-----+
    |value|
    +-----+
    |    1|
    +-----+
    ```
    
    ```python
    >>> spark.conf.set("spark.sql.execution.arrow.enabled", "false")
    >>> spark.createDataFrame(pd.DataFrame([{"a": 1}]), "int").show()
    ```
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/session.py", line 595, in createDataFrame
        rdd, schema = self._createFromLocal(map(prepare, data), schema)
      File "/.../spark/python/pyspark/sql/session.py", line 399, in _createFromLocal
        data = list(data)
      File "/.../spark/python/pyspark/sql/session.py", line 585, in prepare
        verify_func(obj)
      File "/.../spark/python/pyspark/sql/types.py", line 1411, in verify
        verify_value(obj)
      File "/.../spark/python/pyspark/sql/types.py", line 1337, in verify_integer
        verify_acceptable_types(obj)
      File "/.../spark/python/pyspark/sql/types.py", line 1300, in verify_acceptable_types
        % (dataType, obj, type(obj))))
    TypeError: field value: IntegerType can not accept object (1,) in type <type 'tuple'>
    ```
    
    I thought disallowing it is actually more consistent with normal Python lists:
    
    
    ```python
    >>> spark.createDataFrame([1], "int").show()
    ```
    
    ```
    +-----+
    |value|
    +-----+
    |    1|
    +-----+
    ```
    
    ```python
    >>> spark.createDataFrame([[1]], "int").show()
    ```
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/U.../spark/python/pyspark/sql/session.py", line 595, in createDataFrame
        rdd, schema = self._createFromLocal(map(prepare, data), schema)
      File "/.../spark/python/pyspark/sql/session.py", line 399, in _createFromLocal
        data = list(data)
      File "/.../spark/python/pyspark/sql/session.py", line 585, in prepare
        verify_func(obj)
      File "/.../spark/python/pyspark/sql/types.py", line 1411, in verify
        verify_value(obj)
      File "/.../spark/python/pyspark/sql/types.py", line 1337, in verify_integer
        verify_acceptable_types(obj)
      File "/.../spark/python/pyspark/sql/types.py", line 1300, in verify_acceptable_types
        % (dataType, obj, type(obj))))
    TypeError: field value: IntegerType can not accept object [1] in type <type 'list'>
    ```
    
    If we need to support this, I think it should print as below:
    
    ```python
    >>> spark.createDataFrame([[1]], "string").show()
    ```
    
    ```
    +-----+
    |value|
    +-----+
    |  [1]|
    +-----+
    ```
    
    although, I am not sure if we intended to support this:
    
    ```python
    >>> spark.conf.set("spark.sql.execution.arrow.enabled", "false")
    >>> spark.createDataFrame(pd.DataFrame([{"a": 1}]), "string").show()
    ```
    
    ```
    +--------------------+
    |               value|
    +--------------------+
    |[Ljava.lang.Objec...|
    +--------------------+
    ```
    
    So, my suggestion is, simply falling back in this case, assuming we didn't intend to support this case and this usecase looks rare as the existing case without Arrow already prints an werid result or throws an error.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r146421804
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,73 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, _cast_pandas_series_type
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        if schema is None or isinstance(schema, list):
    +            batches = [pa.RecordBatch.from_pandas(pdf_slice, preserve_index=False)
    +                       for pdf_slice in pdf_slices]
    +
    +            # There will be at least 1 batch after slicing the pandas.DataFrame
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +
    +            # If passed schema as a list of names then rename fields
    +            if isinstance(schema, list):
    +                fields = []
    +                for i, field in enumerate(schema_from_arrow):
    +                    field.name = schema[i]
    +                    fields.append(field)
    +                schema = StructType(fields)
    +            else:
    +                schema = schema_from_arrow
    +        else:
    +            batches = []
    +            for i, pdf_slice in enumerate(pdf_slices):
    +
    +                # convert to series to pyarrow.Arrays to use mask when creating Arrow batches
    +                arrs = []
    +                names = []
    +                for c, (_, series) in enumerate(pdf_slice.iteritems()):
    +                    field = schema[c]
    +                    names.append(field.name)
    +                    t = to_arrow_type(field.dataType)
    +                    try:
    +                        # NOTE: casting is not necessary with Arrow >= 0.7
    +                        arrs.append(pa.Array.from_pandas(_cast_pandas_series_type(series, t),
    --- End diff --
    
    I think that is a problem with using `astype` which doesn't provide any checks afaik.  This casting is better done in Arrow, but since we are currently stuck on 0.4.1 we need this workaround.  Trying this out with the latest arrow would give the user a nice error:
    ```
    >>> pa.Array.from_pandas(s, type=pa.int16())
    <pyarrow.lib.Int16Array object at 0x7f18361fecb0>
    [
      1,
      2,
      10001
    ]
    >>> pa.Array.from_pandas(s, type=pa.int8())
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "pyarrow/array.pxi", line 279, in pyarrow.lib.Array.from_pandas (/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:25865)
      File "pyarrow/array.pxi", line 169, in pyarrow.lib.array (/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:24833)
      File "pyarrow/array.pxi", line 70, in pyarrow.lib._ndarray_to_array (/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:24083)
      File "pyarrow/error.pxi", line 77, in pyarrow.lib.check_status (/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:7876)
    pyarrow.lib.ArrowInvalid: Integer value out of bounds
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Thanks @HyukjinKwon @ueshin  and @viirya !


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Thanks for the reviews @ueshin and @HyukjinKwon!  I added `to_arrow_schema` conversion for when a schema is passed into `createDataFrame` and added some new tests to verify it. Please take another look when you can, thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82894/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144931496
  
    --- Diff: python/pyspark/sql/types.py ---
    @@ -1624,6 +1624,50 @@ def to_arrow_type(dt):
         return arrow_type
     
     
    +def to_arrow_schema(schema):
    +    """ Convert a schema from Spark to Arrow
    +    """
    +    import pyarrow as pa
    +    fields = [pa.field(field.name, to_arrow_type(field.dataType)) for field in schema]
    --- End diff --
    
    yeah, good idea


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #82894 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82894/testReport)** for PR 19459 at commit [`3052f30`](https://github.com/apache/spark/commit/3052f3063e965d3636dd172a6981d93155b77fd2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150228512
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -454,13 +454,60 @@ def _convert_from_pandas(self, pdf, schema):
     
             # Check if any columns need to be fixed for Spark to infer properly
             if len(np_records) > 0:
    -            record_type_list = self._get_numpy_record_dtypes(np_records[0])
    -            if record_type_list is not None:
    -                return [r.astype(record_type_list).tolist() for r in np_records], schema
    +            record_dtype = self._get_numpy_record_dtype(np_records[0])
    +            if record_dtype is not None:
    +                return [r.astype(record_dtype).tolist() for r in np_records], schema
     
             # Convert list of numpy records to python lists
             return [r.tolist() for r in np_records], schema
     
    +    def _create_from_pandas_with_arrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer, _create_batch
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType
    +        from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
    +
    +        # Determine arrow types to coerce data when creating batches
    +        if isinstance(schema, StructType):
    +            arrow_types = [to_arrow_type(f.dataType) for f in schema.fields]
    +        elif isinstance(schema, DataType):
    +            raise ValueError("Single data type %s is not supported with Arrow" % str(schema))
    +        else:
    +            # Any timestamps must be coerced to be compatible with Spark
    +            arrow_types = [to_arrow_type(TimestampType())
    +                           if is_datetime64_dtype(t) or is_datetime64tz_dtype(t) else None
    +                           for t in pdf.dtypes]
    +
    +        # Slice the DataFrame to be batched
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        # Create Arrow record batches
    +        batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)])
    +                   for pdf_slice in pdf_slices]
    +
    +        # Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing)
    +        if schema is None or isinstance(schema, list):
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +            names = pdf.columns if schema is None else schema
    --- End diff --
    
    Could we maybe just resemble
    
    https://github.com/apache/spark/blob/1d341042d6948e636643183da9bf532268592c6a/python/pyspark/sql/session.py#L403-L411
    
    just to be more readable in a way?
    
    ```python
    if schema is None or isinstance(schema, (list, tuple)):
        struct = from_arrow_schema(batches[0].schema)
        if isinstance(schema, (list, tuple)):
            for i, name in enumerate(schema):
                struct.fields[i].name = name
                struct.names[i] = name
        schema = struct
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144706853
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    --- End diff --
    
    This looks kind of duplicate with the main logic of `context.parallelize`. Maybe we can extract a common function from it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144931295
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala ---
    @@ -29,4 +32,19 @@ private[sql] object PythonSQLUtils {
       def listBuiltinFunctionInfos(): Array[ExpressionInfo] = {
         FunctionRegistry.functionSet.flatMap(f => FunctionRegistry.builtin.lookupFunction(f)).toArray
       }
    +
    +  /**
    +   * Python Callable function to convert ArrowPayloads into a [[DataFrame]].
    +   *
    +   * @param payloadRDD A JavaRDD of ArrowPayloads.
    +   * @param schemaString JSON Formatted Schema for ArrowPayloads.
    +   * @param sqlContext The active [[SQLContext]].
    +   * @return The converted [[DataFrame]].
    +   */
    +  def arrowPayloadToDataFrame(
    +       payloadRDD: JavaRDD[Array[Byte]],
    +       schemaString: String,
    +       sqlContext: SQLContext): DataFrame = {
    --- End diff --
    
    oh man, good catch! I don't know how that happened :\


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r143821657
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---
    @@ -203,4 +205,16 @@ private[sql] object ArrowConverters {
           reader.close()
         }
       }
    +
    +  def toDataFrame(
    --- End diff --
    
    I left the conversion logic in `ArrowConverters` because I think there is a good chance it will change, so just added a wrapper to `PythonSQLUtils` let me know if it's ok.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150314957
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -454,13 +454,60 @@ def _convert_from_pandas(self, pdf, schema):
     
             # Check if any columns need to be fixed for Spark to infer properly
             if len(np_records) > 0:
    -            record_type_list = self._get_numpy_record_dtypes(np_records[0])
    -            if record_type_list is not None:
    -                return [r.astype(record_type_list).tolist() for r in np_records], schema
    +            record_dtype = self._get_numpy_record_dtype(np_records[0])
    +            if record_dtype is not None:
    +                return [r.astype(record_dtype).tolist() for r in np_records], schema
     
             # Convert list of numpy records to python lists
             return [r.tolist() for r in np_records], schema
     
    +    def _create_from_pandas_with_arrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer, _create_batch
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType
    +        from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
    +
    +        # Determine arrow types to coerce data when creating batches
    +        if isinstance(schema, StructType):
    +            arrow_types = [to_arrow_type(f.dataType) for f in schema.fields]
    +        elif isinstance(schema, DataType):
    +            raise ValueError("Single data type %s is not supported with Arrow" % str(schema))
    +        else:
    +            # Any timestamps must be coerced to be compatible with Spark
    +            arrow_types = [to_arrow_type(TimestampType())
    +                           if is_datetime64_dtype(t) or is_datetime64tz_dtype(t) else None
    +                           for t in pdf.dtypes]
    +
    +        # Slice the DataFrame to be batched
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        # Create Arrow record batches
    +        batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)])
    +                   for pdf_slice in pdf_slices]
    +
    +        # Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing)
    +        if schema is None or isinstance(schema, list):
    --- End diff --
    
    Looks like this check should include tuples as well for converting from unicode?
    https://github.com/apache/spark/pull/19459/files#diff-3b5463566251d5b09fd328738a9e9bc5L579
    I'll change that since it's related..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83647/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145029049
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    --- End diff --
    
    Sounds good to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    I think it is a bug, we should fix it first.
    
    BTW I'm fine to upgrade arrow, just make sure we get everything we need at the arrow version we wanna upgrade, then remove all the hacks at Spark side. We should throw exception if users have an old arrow version installed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    LGTM with few minor comments.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r146431078
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,73 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, _cast_pandas_series_type
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        if schema is None or isinstance(schema, list):
    +            batches = [pa.RecordBatch.from_pandas(pdf_slice, preserve_index=False)
    +                       for pdf_slice in pdf_slices]
    +
    +            # There will be at least 1 batch after slicing the pandas.DataFrame
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +
    +            # If passed schema as a list of names then rename fields
    +            if isinstance(schema, list):
    +                fields = []
    +                for i, field in enumerate(schema_from_arrow):
    +                    field.name = schema[i]
    +                    fields.append(field)
    +                schema = StructType(fields)
    +            else:
    +                schema = schema_from_arrow
    +        else:
    +            batches = []
    +            for i, pdf_slice in enumerate(pdf_slices):
    +
    +                # convert to series to pyarrow.Arrays to use mask when creating Arrow batches
    +                arrs = []
    +                names = []
    +                for c, (_, series) in enumerate(pdf_slice.iteritems()):
    +                    field = schema[c]
    +                    names.append(field.name)
    +                    t = to_arrow_type(field.dataType)
    +                    try:
    +                        # NOTE: casting is not necessary with Arrow >= 0.7
    +                        arrs.append(pa.Array.from_pandas(_cast_pandas_series_type(series, t),
    +                                                         mask=series.isnull(), type=t))
    +                    except ValueError as e:
    --- End diff --
    
    yeah, there doesn't seem to be a way to guard against overflow with `astype`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r149874063
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -213,7 +213,15 @@ def __repr__(self):
             return "ArrowSerializer"
     
     
    -def _create_batch(series):
    +def _create_batch(series, copy=False):
    --- End diff --
    
    Yeah, we don't want to end up double copying if `copy=True`.  Let me try something and if it ends up making things too complicated then we can remove the copy flag altogether and just rely on `fillna(0)` to always make a copy - not ideal but will be more simple


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144931384
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    --- End diff --
    
    Thanks, fixed!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #82866 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82866/testReport)** for PR 19459 at commit [`81ddfa9`](https://github.com/apache/spark/commit/81ddfa9afa03531edd9ac7b805a09be2be96d88c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83635 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83635/testReport)** for PR 19459 at commit [`421d0be`](https://github.com/apache/spark/commit/421d0beafe0aeff8e689fa05af0505a4c8b1c556).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83635 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83635/testReport)** for PR 19459 at commit [`421d0be`](https://github.com/apache/spark/commit/421d0beafe0aeff8e689fa05af0505a4c8b1c556).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144706672
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    --- End diff --
    
    typo: slicing the.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144601470
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                from pyspark.serializers import ArrowSerializer
    +                from pyspark.sql.types import from_arrow_schema
    +                import pyarrow as pa
    +
    +                # Slice the DataFrame into batches
    +                split = -(-len(data) // self.sparkContext.defaultParallelism)  # round int up
    +                slices = (data[i:i + split] for i in xrange(0, len(data), split))
    +                batches = [pa.RecordBatch.from_pandas(sliced_df, preserve_index=False)
    +                           for sliced_df in slices]
    +
    +                # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +                import os
    +                from tempfile import NamedTemporaryFile
    +                tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +                try:
    +                    serializer = ArrowSerializer()
    +                    serializer.dump_stream(batches, tempFile)
    +                    tempFile.close()
    +                    readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +                    jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +                finally:
    +                    # readRDDFromFile eagerily reads the file so we can delete right after.
    +                    os.unlink(tempFile.name)
    +
    +                # Create the Spark DataFrame, there will be at least 1 batch
    +                schema = from_arrow_schema(batches[0].schema)
    +                jdf = self._jvm.PythonSQLUtils.arrowPayloadToDataFrame(
    +                    jrdd, schema.json(), self._wrapped._jsqlContext)
    +                df = DataFrame(jdf, self._wrapped)
    +                df._schema = schema
    --- End diff --
    
    If the schema is not set here, then it will lazily create it through a py4j exchange with the java DataFrame.  Since we already have it here, we can just set it and save some time.  I don't like manually setting it like this though, it should be an optional arg in the DataFrame constructor.  I'll make that change, but if you prefer not to do that I can revert.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #82764 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82764/testReport)** for PR 19459 at commit [`f42e351`](https://github.com/apache/spark/commit/f42e35175969d8d7363e008a586a6f6982290447).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150324146
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -454,13 +454,60 @@ def _convert_from_pandas(self, pdf, schema):
     
             # Check if any columns need to be fixed for Spark to infer properly
             if len(np_records) > 0:
    -            record_type_list = self._get_numpy_record_dtypes(np_records[0])
    -            if record_type_list is not None:
    -                return [r.astype(record_type_list).tolist() for r in np_records], schema
    +            record_dtype = self._get_numpy_record_dtype(np_records[0])
    +            if record_dtype is not None:
    +                return [r.astype(record_dtype).tolist() for r in np_records], schema
     
             # Convert list of numpy records to python lists
             return [r.tolist() for r in np_records], schema
     
    +    def _create_from_pandas_with_arrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer, _create_batch
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType
    +        from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
    +
    +        # Determine arrow types to coerce data when creating batches
    +        if isinstance(schema, StructType):
    +            arrow_types = [to_arrow_type(f.dataType) for f in schema.fields]
    +        elif isinstance(schema, DataType):
    +            raise ValueError("Single data type %s is not supported with Arrow" % str(schema))
    +        else:
    +            # Any timestamps must be coerced to be compatible with Spark
    +            arrow_types = [to_arrow_type(TimestampType())
    +                           if is_datetime64_dtype(t) or is_datetime64tz_dtype(t) else None
    +                           for t in pdf.dtypes]
    +
    +        # Slice the DataFrame to be batched
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        # Create Arrow record batches
    +        batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)])
    +                   for pdf_slice in pdf_slices]
    +
    +        # Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing)
    +        if schema is None or isinstance(schema, list):
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +            names = pdf.columns if schema is None else schema
    --- End diff --
    
    hmm, actually that case can be pulled out of here and non-Arrow `_create_from_pandas`, which @ueshin brought up in #19646 .  This would simplify quite a bit now, so I'll try that out.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150229941
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3180,6 +3185,58 @@ def test_filtered_frame(self):
             self.assertEqual(pdf.columns[0], "i")
             self.assertTrue(pdf.empty)
     
    +    def test_createDataFrame_toggle(self):
    +        pdf = self.create_pandas_data_frame()
    +        self.spark.conf.set("spark.sql.execution.arrow.enabled", "false")
    +        try:
    +            df_no_arrow = self.spark.createDataFrame(pdf)
    +        finally:
    +            self.spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    +        df_arrow = self.spark.createDataFrame(pdf)
    +        self.assertEquals(df_no_arrow.collect(), df_arrow.collect())
    +
    +    def test_createDataFrame_with_schema(self):
    +        pdf = self.create_pandas_data_frame()
    +        df = self.spark.createDataFrame(pdf, schema=self.schema)
    +        self.assertEquals(self.schema, df.schema)
    +        pdf_arrow = df.toPandas()
    +        self.assertFramesEqual(pdf_arrow, pdf)
    +
    +    def test_createDataFrame_with_incorrect_schema(self):
    +        pdf = self.create_pandas_data_frame()
    +        wrong_schema = StructType([field for field in reversed(self.schema)])
    +        with QuietTest(self.sc):
    +            with self.assertRaisesRegexp(TypeError, ".*field.*can.not.accept.*type"):
    +                self.spark.createDataFrame(pdf, schema=wrong_schema)
    +
    +    def test_createDataFrame_with_names(self):
    +        pdf = self.create_pandas_data_frame()
    +        df = self.spark.createDataFrame(pdf, schema=list('abcdefg'))
    +        self.assertEquals(df.schema.fieldNames(), list('abcdefg'))
    --- End diff --
    
    As said above, let's add a test with a tuple too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83761 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83761/testReport)** for PR 19459 at commit [`6c72e37`](https://github.com/apache/spark/commit/6c72e37b0ca520d2756722ce2f18fae3ea32c39e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144598051
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                from pyspark.serializers import ArrowSerializer
    --- End diff --
    
    That's probably a good idea since it's a big block of code.  The other create functions return a (rdd, schema) pair, then do further processing to create a DataFrame.  Here we would have to just return a DataFrame since we don't want to do the further processing.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #82559 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82559/testReport)** for PR 19459 at commit [`e9c6de7`](https://github.com/apache/spark/commit/e9c6de737a939ce8cbe3c921955662661024420e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r149886093
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -213,7 +213,15 @@ def __repr__(self):
             return "ArrowSerializer"
     
     
    -def _create_batch(series):
    +def _create_batch(series, copy=False):
    --- End diff --
    
    @ueshin this ended up having no effect, so I took it out.  For the case of Timestamps, the timezone conversions will make a copy regardless.  For the case of ints being promoted to floats then that means they will have null values and need to call `fillna(0)` which makes a copy anyway.  So it seems this only makes copies when necessary.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145034007
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +        try:
    +            serializer = ArrowSerializer()
    +            serializer.dump_stream(batches, tempFile)
    +            tempFile.close()
    +            readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +            jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +        finally:
    +            # readRDDFromFile eagerily reads the file so we can delete right after.
    +            os.unlink(tempFile.name)
    +
    +        # Create the Spark DataFrame, there will be at least 1 batch
    +        schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    Btw, do we also need to support schema like `['name', 'age']`, `"int"`(not `StructType`), etc. from doctest.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144994355
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    --- End diff --
    
    I'd prefer less duplicate. Let's see if others support it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150445421
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -438,28 +438,70 @@ def _get_numpy_record_dtypes(self, rec):
                     curr_type = 'datetime64[us]'
                     has_rec_fix = True
                 record_type_list.append((str(col_names[i]), curr_type))
    -        return record_type_list if has_rec_fix else None
    +        return np.dtype(record_type_list) if has_rec_fix else None
     
    -    def _convert_from_pandas(self, pdf, schema):
    +    def _convert_from_pandas(self, pdf):
             """
              Convert a pandas.DataFrame to list of records that can be used to make a DataFrame
    -         :return tuple of list of records and schema
    +         :return list of records
             """
    -        # If no schema supplied by user then get the names of columns only
    -        if schema is None:
    -            schema = [str(x) for x in pdf.columns]
     
             # Convert pandas.DataFrame to list of numpy records
             np_records = pdf.to_records(index=False)
     
             # Check if any columns need to be fixed for Spark to infer properly
             if len(np_records) > 0:
    -            record_type_list = self._get_numpy_record_dtypes(np_records[0])
    -            if record_type_list is not None:
    -                return [r.astype(record_type_list).tolist() for r in np_records], schema
    +            record_dtype = self._get_numpy_record_dtype(np_records[0])
    +            if record_dtype is not None:
    +                return [r.astype(record_dtype).tolist() for r in np_records]
     
             # Convert list of numpy records to python lists
    -        return [r.tolist() for r in np_records], schema
    +        return [r.tolist() for r in np_records]
    +
    +    def _create_from_pandas_with_arrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer, _create_batch
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType
    +        from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
    +
    +        # Determine arrow types to coerce data when creating batches
    +        if isinstance(schema, StructType):
    +            arrow_types = [to_arrow_type(f.dataType) for f in schema.fields]
    +        elif isinstance(schema, DataType):
    +            raise ValueError("Single data type %s is not supported with Arrow" % str(schema))
    +        else:
    +            # Any timestamps must be coerced to be compatible with Spark
    +            arrow_types = [to_arrow_type(TimestampType())
    +                           if is_datetime64_dtype(t) or is_datetime64tz_dtype(t) else None
    +                           for t in pdf.dtypes]
    +
    +        # Slice the DataFrame to be batched
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        # Create Arrow record batches
    +        batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)])
    +                   for pdf_slice in pdf_slices]
    +
    +        # Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing)
    +        if isinstance(schema, (list, tuple)):
    +            struct = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    @BryanCutler, I think here we'd meet the same issue, SPARK-15244 in this code path. Mind opening a followup with a simple test if it is true?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83018 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83018/testReport)** for PR 19459 at commit [`0de3126`](https://github.com/apache/spark/commit/0de3126240491577e92bc4452a5e1cc719ab5cc6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r146421602
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,6 +578,12 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                df = self._createFromPandasWithArrow(data, schema)
    +                # Fallback to create DataFrame without arrow if return None
    +                if df is not None:
    --- End diff --
    
    the PR from @ueshin better captures errors to show a warning, i'll merge that now


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83569 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83569/testReport)** for PR 19459 at commit [`99ce1e4`](https://github.com/apache/spark/commit/99ce1e44f57c411af95b1c9d9c95f35f2c1652e1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Looks pretty solid. Will take a another look today (KST) and merge this one in few days if there are no more comments and/or other committers are busy to take a look and merge.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Thanks for reviewing @viirya !
    
    I just had some followup questions at https://github.com/apache/spark/pull/19459#discussion_r144930424 and https://github.com/apache/spark/pull/19459#discussion_r144945183



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144618995
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                from pyspark.serializers import ArrowSerializer
    +                from pyspark.sql.types import from_arrow_schema
    +                import pyarrow as pa
    +
    +                # Slice the DataFrame into batches
    +                split = -(-len(data) // self.sparkContext.defaultParallelism)  # round int up
    +                slices = (data[i:i + split] for i in xrange(0, len(data), split))
    +                batches = [pa.RecordBatch.from_pandas(sliced_df, preserve_index=False)
    +                           for sliced_df in slices]
    +
    +                # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +                import os
    +                from tempfile import NamedTemporaryFile
    +                tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +                try:
    +                    serializer = ArrowSerializer()
    +                    serializer.dump_stream(batches, tempFile)
    +                    tempFile.close()
    +                    readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +                    jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +                finally:
    +                    # readRDDFromFile eagerily reads the file so we can delete right after.
    +                    os.unlink(tempFile.name)
    +
    +                # Create the Spark DataFrame, there will be at least 1 batch
    +                schema = from_arrow_schema(batches[0].schema)
    +                jdf = self._jvm.PythonSQLUtils.arrowPayloadToDataFrame(
    +                    jrdd, schema.json(), self._wrapped._jsqlContext)
    +                df = DataFrame(jdf, self._wrapped)
    +                df._schema = schema
    --- End diff --
    
    Ahh, okay, that's fine to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83647 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83647/testReport)** for PR 19459 at commit [`0ad736b`](https://github.com/apache/spark/commit/0ad736b352eacd394ea6ea684aa851853769e7d1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145603645
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,73 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, _cast_pandas_series_type
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        if schema is None or isinstance(schema, list):
    +            batches = [pa.RecordBatch.from_pandas(pdf_slice, preserve_index=False)
    +                       for pdf_slice in pdf_slices]
    +
    +            # There will be at least 1 batch after slicing the pandas.DataFrame
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +
    +            # If passed schema as a list of names then rename fields
    +            if isinstance(schema, list):
    +                fields = []
    +                for i, field in enumerate(schema_from_arrow):
    +                    field.name = schema[i]
    +                    fields.append(field)
    +                schema = StructType(fields)
    +            else:
    +                schema = schema_from_arrow
    +        else:
    +            batches = []
    +            for i, pdf_slice in enumerate(pdf_slices):
    +
    +                # convert to series to pyarrow.Arrays to use mask when creating Arrow batches
    +                arrs = []
    +                names = []
    +                for c, (_, series) in enumerate(pdf_slice.iteritems()):
    +                    field = schema[c]
    +                    names.append(field.name)
    +                    t = to_arrow_type(field.dataType)
    +                    try:
    +                        # NOTE: casting is not necessary with Arrow >= 0.7
    +                        arrs.append(pa.Array.from_pandas(_cast_pandas_series_type(series, t),
    +                                                         mask=series.isnull(), type=t))
    +                    except ValueError as e:
    +                        warnings.warn("Arrow will not be used in createDataFrame: %s" % str(e))
    +                        return None
    +                batches.append(pa.RecordBatch.from_arrays(arrs, names))
    +
    +                # Verify schema of first batch, return None if not equal and fallback without Arrow
    +                if i == 0:
    +                    schema_from_arrow = from_arrow_schema(batches[i].schema)
    +                    if schema != schema_from_arrow:
    +                        warnings.warn("Arrow will not be used in createDataFrame.\n" +
    --- End diff --
    
    Will we reach this block?
    I guess not because all datatypes are casted to the types specified by the schema otherwise some exception like `ValueError` are raised and fallback to withtout-Arrow.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145028238
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +        try:
    +            serializer = ArrowSerializer()
    +            serializer.dump_stream(batches, tempFile)
    +            tempFile.close()
    +            readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +            jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +        finally:
    +            # readRDDFromFile eagerily reads the file so we can delete right after.
    +            os.unlink(tempFile.name)
    +
    +        # Create the Spark DataFrame, there will be at least 1 batch
    +        schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    Could we check ahead and fall back to `toPandas` without Arrow? I think I am seeing some differences in some cases, for example:
    
    ```python
    import pandas as pd
    import numpy as np
    
    spark.conf.set("spark.sql.execution.arrow.enabled", "false")
    spark.createDataFrame(pd.DataFrame(data={"intcol": [0.8]}), schema="intcol STRING").show()
    spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    spark.createDataFrame(pd.DataFrame(data={"intcol": [0.8]}), schema="intcol STRING").show()
    ```
    
    WDYT @BryanCutler and @ueshin ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83761/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150227714
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -454,13 +454,60 @@ def _convert_from_pandas(self, pdf, schema):
     
             # Check if any columns need to be fixed for Spark to infer properly
             if len(np_records) > 0:
    -            record_type_list = self._get_numpy_record_dtypes(np_records[0])
    -            if record_type_list is not None:
    -                return [r.astype(record_type_list).tolist() for r in np_records], schema
    +            record_dtype = self._get_numpy_record_dtype(np_records[0])
    +            if record_dtype is not None:
    +                return [r.astype(record_dtype).tolist() for r in np_records], schema
     
             # Convert list of numpy records to python lists
             return [r.tolist() for r in np_records], schema
     
    +    def _create_from_pandas_with_arrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer, _create_batch
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType
    +        from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
    +
    +        # Determine arrow types to coerce data when creating batches
    +        if isinstance(schema, StructType):
    +            arrow_types = [to_arrow_type(f.dataType) for f in schema.fields]
    +        elif isinstance(schema, DataType):
    +            raise ValueError("Single data type %s is not supported with Arrow" % str(schema))
    +        else:
    +            # Any timestamps must be coerced to be compatible with Spark
    +            arrow_types = [to_arrow_type(TimestampType())
    +                           if is_datetime64_dtype(t) or is_datetime64tz_dtype(t) else None
    +                           for t in pdf.dtypes]
    +
    +        # Slice the DataFrame to be batched
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        # Create Arrow record batches
    +        batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)])
    +                   for pdf_slice in pdf_slices]
    +
    +        # Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing)
    +        if schema is None or isinstance(schema, list):
    --- End diff --
    
    `isinstance(schema, list)` -> `isinstance(schema, (list, tuple))` maybe?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    LGTM too but let me leave it to @ueshin just in case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82764/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83233/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83647 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83647/testReport)** for PR 19459 at commit [`0ad736b`](https://github.com/apache/spark/commit/0ad736b352eacd394ea6ea684aa851853769e7d1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19459


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #82559 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82559/testReport)** for PR 19459 at commit [`e9c6de7`](https://github.com/apache/spark/commit/e9c6de737a939ce8cbe3c921955662661024420e).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144339301
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                from pyspark.serializers import ArrowSerializer
    +                from pyspark.sql.types import from_arrow_schema
    +                import pyarrow as pa
    +
    +                # Slice the DataFrame into batches
    +                split = -(-len(data) // self.sparkContext.defaultParallelism)  # round int up
    +                slices = (data[i:i + split] for i in xrange(0, len(data), split))
    --- End diff --
    
    How about `split` -> `size` (or `length`) and `i` -> `offset` (or `start`)?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #82573 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82573/testReport)** for PR 19459 at commit [`9d667c6`](https://github.com/apache/spark/commit/9d667c6fcb7e47169a2e48ec130fbdbb42a21f41).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145032174
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +        try:
    +            serializer = ArrowSerializer()
    +            serializer.dump_stream(batches, tempFile)
    +            tempFile.close()
    +            readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +            jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +        finally:
    +            # readRDDFromFile eagerily reads the file so we can delete right after.
    +            os.unlink(tempFile.name)
    +
    +        # Create the Spark DataFrame, there will be at least 1 batch
    +        schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    But I thought we should reduce the diff between `createDataFrame` with Arrow and `createDataFrame` without Arrow, and match the behaviour first though. To be clear, my suggestion is:
    
    > 2. We could check the user supplied schema matches the ArrowRecordBatch schema and fail immediately if not equal. I'm not sure if there might be some cases where you wouldn't want it to fail - like with different integer types..
    
    but if they are different, fall back to `createDataFrame` without Arrow to reduce the differences between them. I carefully guess `createDataFrame` is stricter?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150321176
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -454,13 +454,60 @@ def _convert_from_pandas(self, pdf, schema):
     
             # Check if any columns need to be fixed for Spark to infer properly
             if len(np_records) > 0:
    -            record_type_list = self._get_numpy_record_dtypes(np_records[0])
    -            if record_type_list is not None:
    -                return [r.astype(record_type_list).tolist() for r in np_records], schema
    +            record_dtype = self._get_numpy_record_dtype(np_records[0])
    +            if record_dtype is not None:
    +                return [r.astype(record_dtype).tolist() for r in np_records], schema
     
             # Convert list of numpy records to python lists
             return [r.tolist() for r in np_records], schema
     
    +    def _create_from_pandas_with_arrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer, _create_batch
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType
    +        from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
    +
    +        # Determine arrow types to coerce data when creating batches
    +        if isinstance(schema, StructType):
    +            arrow_types = [to_arrow_type(f.dataType) for f in schema.fields]
    +        elif isinstance(schema, DataType):
    +            raise ValueError("Single data type %s is not supported with Arrow" % str(schema))
    +        else:
    +            # Any timestamps must be coerced to be compatible with Spark
    +            arrow_types = [to_arrow_type(TimestampType())
    +                           if is_datetime64_dtype(t) or is_datetime64tz_dtype(t) else None
    +                           for t in pdf.dtypes]
    +
    +        # Slice the DataFrame to be batched
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        # Create Arrow record batches
    +        batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)])
    +                   for pdf_slice in pdf_slices]
    +
    +        # Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing)
    +        if schema is None or isinstance(schema, list):
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +            names = pdf.columns if schema is None else schema
    --- End diff --
    
    Sure. We still need the line
    ```
    schema = [str(x) for x in pdf.columns] if schema is None else schema
    ```
    for the case when schema is None, because the pdf column names are lost when creating the Arrow RecordBatch from using pandas.Series with `_create_batch`.
    Otherwise, I think this makes it easier to follow too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    @ueshin if possible I'd like to have #18664 merged first and then I can fix this PR up if needed, thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r143788090
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enable", "false").lower() == "true" \
    --- End diff --
    
    Oh thanks, I didn't see that go in.  I'll update.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Jenkins, retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83569/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #82866 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82866/testReport)** for PR 19459 at commit [`81ddfa9`](https://github.com/apache/spark/commit/81ddfa9afa03531edd9ac7b805a09be2be96d88c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144599111
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                from pyspark.serializers import ArrowSerializer
    --- End diff --
    
    Oh wait, without Arrow it creates a (rdd, schema) pair like the others, so having with Arrow and without in `_createFromPandasDataFrame` doesn't fit because they return different things.  How about just putting the new arrow code in a function like `_createFromArrowPandas`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r149871432
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -213,7 +213,15 @@ def __repr__(self):
             return "ArrowSerializer"
     
     
    -def _create_batch(series):
    +def _create_batch(series, copy=False):
    --- End diff --
    
    Hmm, I guess it depends.
    With the method, it can reduce the number of copy if `s` doesn't include null values, but also it might increase the number if `s` includes null values and `copy=True`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83703 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83703/testReport)** for PR 19459 at commit [`6c72e37`](https://github.com/apache/spark/commit/6c72e37b0ca520d2756722ce2f18fae3ea32c39e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Benchmarks for running in local mode 16 GB memory, i7-4800MQ CPU @ 2.70GHz × 8 cores
    using default Spark configuration
    data is 10 columns of doubles with 100,000 rows
    
    Code:
    ```python
    import pandas as pd
    import numpy as np
    spark.conf.set("spark.sql.execution.arrow.enable", "false")
    pdf = pd.DataFrame(np.random.rand(100000, 10), columns=list("abcdefghij"))
    %timeit spark.createDataFrame(pdf)
    spark.conf.set("spark.sql.execution.arrow.enable", "true")
    %timeit spark.createDataFrame(pdf)
    ```
    
    Without Arrow: 
    1 loop, best of 3: 7.21 s per loop
    
    With Arrow:
    10 loops, best of 3: 30.6 ms per loop
    
    **Speedup of ~ 235x**
    
    Also, tested creating up to 2 million rows with Arrow and results scale linearly


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83018 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83018/testReport)** for PR 19459 at commit [`0de3126`](https://github.com/apache/spark/commit/0de3126240491577e92bc4452a5e1cc719ab5cc6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    I guess this pr is almost ready to be merged.
    I'd cc @gatorsmile @cloud-fan for another look.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #82894 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82894/testReport)** for PR 19459 at commit [`3052f30`](https://github.com/apache/spark/commit/3052f3063e965d3636dd172a6981d93155b77fd2).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145865969
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,6 +578,12 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                df = self._createFromPandasWithArrow(data, schema)
    --- End diff --
    
    As of https://github.com/apache/spark/pull/19459#issuecomment-337674952, `schema` from `_parse_datatype_string` could be not a `StructType`:
    
    https://github.com/apache/spark/blob/bfc7e1fe1ad5f9777126f2941e29bbe51ea5da7c/python/pyspark/sql/tests.py#L1325
    
    although I don't think we have supported this case with `pd.DataFrame` as `int` case resembles `Dataset` with primitive types, up to my knowledge:
    
    ```
    spark.createDataFrame(["a", "b"], "string").show()
    +-----+
    |value|
    +-----+
    |    a|
    |    b|
    +-----+
    ```
    
    For `pd.DataFrame` case, looks we always have a list of list.
    
    https://github.com/apache/spark/blob/d492cc5a21cd67b3999b85d97f5c41c3734b1ba3/python/pyspark/sql/session.py#L515
    
    So, I think we should only support list of strings maybe with a proper exception for `int` case.
    
    Of course, this case should work:
    
    ```
    >>> spark.createDataFrame(pd.DataFrame([1]), "struct<a: int>").show()
    +---+
    |  a|
    +---+
    |  1|
    +---+
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82560/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged in PR from @ueshin and added case for when schema is a string single datatype.  In addition using a `StructType`, now this handles specifying the schema with the following:
    
    ```
    spark.createDataFrame(pdf, ['name', 'age'])
    spark.createDataFrame(pdf, "a: string, b: int")
    spark.createDataFrame(pdf, "int")
    spark.createDataFrame(pdf, "struct<a: int>")
    ```
    
    @viirya brought up a good point here https://github.com/apache/spark/pull/19459#discussion_r145862488  (linking because it's outdated and hidden) - which shows another good reason to upgrade Arrow, I think


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150228738
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -454,13 +454,60 @@ def _convert_from_pandas(self, pdf, schema):
     
             # Check if any columns need to be fixed for Spark to infer properly
             if len(np_records) > 0:
    -            record_type_list = self._get_numpy_record_dtypes(np_records[0])
    -            if record_type_list is not None:
    -                return [r.astype(record_type_list).tolist() for r in np_records], schema
    +            record_dtype = self._get_numpy_record_dtype(np_records[0])
    +            if record_dtype is not None:
    +                return [r.astype(record_dtype).tolist() for r in np_records], schema
     
             # Convert list of numpy records to python lists
             return [r.tolist() for r in np_records], schema
     
    +    def _create_from_pandas_with_arrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer, _create_batch
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType
    +        from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
    +
    +        # Determine arrow types to coerce data when creating batches
    +        if isinstance(schema, StructType):
    +            arrow_types = [to_arrow_type(f.dataType) for f in schema.fields]
    +        elif isinstance(schema, DataType):
    +            raise ValueError("Single data type %s is not supported with Arrow" % str(schema))
    +        else:
    +            # Any timestamps must be coerced to be compatible with Spark
    +            arrow_types = [to_arrow_type(TimestampType())
    +                           if is_datetime64_dtype(t) or is_datetime64tz_dtype(t) else None
    +                           for t in pdf.dtypes]
    +
    +        # Slice the DataFrame to be batched
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        # Create Arrow record batches
    +        batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)])
    +                   for pdf_slice in pdf_slices]
    +
    +        # Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing)
    +        if schema is None or isinstance(schema, list):
    --- End diff --
    
    Maybe, a test like `spark.createDataFrame([[1]], ("v",))` would be great.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83018/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150206108
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -225,11 +232,11 @@ def _create_batch(series):
         # If a nullable integer series has been promoted to floating point with NaNs, need to cast
         # NOTE: this is not necessary with Arrow >= 0.7
         def cast_series(s, t):
    -        if type(t) == pa.TimestampType:
    +        if t is not None and type(t) == pa.TimestampType:
    --- End diff --
    
    Hm, mind asking why `t is not None` is added? I thought `None` is `NoneType` and won't be `pa.TimestampType` anyway.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83703 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83703/testReport)** for PR 19459 at commit [`6c72e37`](https://github.com/apache/spark/commit/6c72e37b0ca520d2756722ce2f18fae3ea32c39e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145039356
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +        try:
    +            serializer = ArrowSerializer()
    +            serializer.dump_stream(batches, tempFile)
    +            tempFile.close()
    +            readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +            jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +        finally:
    +            # readRDDFromFile eagerily reads the file so we can delete right after.
    +            os.unlink(tempFile.name)
    +
    +        # Create the Spark DataFrame, there will be at least 1 batch
    +        schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    Hm.. FWIW, I get something like:
    
    ```
      File "pyarrow/table.pxi", line 517, in pyarrow.lib.RecordBatch.from_pandas (/.../build/xhochy/pyarrow-macos-wheels/arrow/python/build/temp.macosx-10.6-intel-2.7/lib.cxx:28616)
      File "pyarrow/table.pxi", line 336, in pyarrow.lib._dataframe_to_arrays (/.../build/xhochy/pyarrow-macos-wheels/arrow/python/build/temp.macosx-10.6-intel-2.7/lib.cxx:26836)
      File "pyarrow/array.pxi", line 1157, in pyarrow.lib.Array.from_pandas (/.../build/xhochy/pyarrow-macos-wheels/arrow/python/build/temp.macosx-10.6-intel-2.7/lib.cxx:19825)
      File "pyarrow/error.pxi", line 66, in pyarrow.lib.check_status (/.../build/xhochy/pyarrow-macos-wheels/arrow/python/build/temp.macosx-10.6-intel-2.7/lib.cxx:7157)
    pyarrow.lib.ArrowNotImplementedError: NotImplemented: string
    ```
    
    ```
    >>> import pyarrow
    >>> pyarrow.__version__
    '0.4.0'
    >>> import pandas
    >>> pandas.__version__
    u'0.20.2'
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r143600411
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3147,6 +3150,14 @@ def test_filtered_frame(self):
             self.assertEqual(pdf.columns[0], "i")
             self.assertTrue(pdf.empty)
     
    +    def test_createDataFrame_toggle(self):
    +        pdf = self.createPandasDataFrameFromeData()
    +        self.spark.conf.set("spark.sql.execution.arrow.enable", "false")
    +        df_no_arrow = self.spark.createDataFrame(pdf)
    +        self.spark.conf.set("spark.sql.execution.arrow.enable", "true")
    --- End diff --
    
    I'd set this to `true` in `finally` just in case the test failed in `df_no_arrow = self.spark.createDataFrame(pdf)` and `spark.sql.execution.arrow.enable` reminds `false` affecting other test cases if I didn't miss something.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144926065
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    --- End diff --
    
    Thanks! fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #82764 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82764/testReport)** for PR 19459 at commit [`f42e351`](https://github.com/apache/spark/commit/f42e35175969d8d7363e008a586a6f6982290447).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83703/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145863796
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,73 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, _cast_pandas_series_type
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        if schema is None or isinstance(schema, list):
    +            batches = [pa.RecordBatch.from_pandas(pdf_slice, preserve_index=False)
    +                       for pdf_slice in pdf_slices]
    +
    +            # There will be at least 1 batch after slicing the pandas.DataFrame
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +
    +            # If passed schema as a list of names then rename fields
    +            if isinstance(schema, list):
    +                fields = []
    +                for i, field in enumerate(schema_from_arrow):
    +                    field.name = schema[i]
    +                    fields.append(field)
    +                schema = StructType(fields)
    +            else:
    +                schema = schema_from_arrow
    +        else:
    +            batches = []
    +            for i, pdf_slice in enumerate(pdf_slices):
    +
    +                # convert to series to pyarrow.Arrays to use mask when creating Arrow batches
    +                arrs = []
    +                names = []
    +                for c, (_, series) in enumerate(pdf_slice.iteritems()):
    +                    field = schema[c]
    +                    names.append(field.name)
    +                    t = to_arrow_type(field.dataType)
    +                    try:
    +                        # NOTE: casting is not necessary with Arrow >= 0.7
    +                        arrs.append(pa.Array.from_pandas(_cast_pandas_series_type(series, t),
    +                                                         mask=series.isnull(), type=t))
    +                    except ValueError as e:
    --- End diff --
    
    I think this guard only works to prevent casting like:
    ```python
    >>> s = pd.Series(["abc", "2", "10001"])                                                                           
    >>> s.astype(np.object_)                                                                                           
    0      abc
    1        2
    2    10001
    dtype: object
    >>> s
    0      abc
    1        2
    2    10001
    dtype: object
    >>> s.astype(np.int8)                                                                                              
    ...
    ValueError: invalid literal for long() with base 10: 'abc'
    ```
    For the casting that can cause overflow, this seems don't work.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144945183
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +        try:
    +            serializer = ArrowSerializer()
    +            serializer.dump_stream(batches, tempFile)
    +            tempFile.close()
    +            readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +            jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +        finally:
    +            # readRDDFromFile eagerily reads the file so we can delete right after.
    +            os.unlink(tempFile.name)
    +
    +        # Create the Spark DataFrame, there will be at least 1 batch
    +        schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    This brings up a good point, the schema should usually be the same, but what should the behavior be if the user specifies the wrong schema?
    
    1. If use the passed in schema and it is wrong (the line in the comment above) then there might be an error later when an operation is performed.  For example, if a string column was specified as DoubleType that would produce:  
    "Caused by: java.lang.UnsupportedOperationException at org.apache.spark.sql.execution.vectorized.ArrowColumnVector$ArrowVectorAccessor.getDouble(ArrowColumnVector.java:395)"
    
    2.  We could check the user supplied schema matches the ArrowRecordBatch schema and fail immediately if not equal.  I'm not sure if there might be some cases where you wouldn't want it to fail - like with different integer types..
    
    3.  Always use the schema from the ArrowRecordBatch (how it is currently in this PR)
    
    I'm thinking that (2) is the best since it's the safest and would produce a clear error message.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144597485
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                from pyspark.serializers import ArrowSerializer
    +                from pyspark.sql.types import from_arrow_schema
    +                import pyarrow as pa
    +
    +                # Slice the DataFrame into batches
    +                split = -(-len(data) // self.sparkContext.defaultParallelism)  # round int up
    +                slices = (data[i:i + split] for i in xrange(0, len(data), split))
    +                batches = [pa.RecordBatch.from_pandas(sliced_df, preserve_index=False)
    +                           for sliced_df in slices]
    +
    +                # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +                import os
    +                from tempfile import NamedTemporaryFile
    +                tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +                try:
    +                    serializer = ArrowSerializer()
    +                    serializer.dump_stream(batches, tempFile)
    +                    tempFile.close()
    +                    readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +                    jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +                finally:
    +                    # readRDDFromFile eagerily reads the file so we can delete right after.
    +                    os.unlink(tempFile.name)
    +
    +                # Create the Spark DataFrame, there will be at least 1 batch
    +                schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    Good point. We can pass the schema, if provided, into `to_pandas` for pyarrow to use when creating the RecordBatch.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    After incorporating date and timestamp types for this, I had to refactor a little to use `_create_batch` from serializers to make Arrow batches from Columns even when the user doesn't specify the schema to be able to use the casts for these types. It doesn't seem to affect performance from the initial benchmark.
    
    I came across an issue when using pandas DataFrame with timestamps without Arrow.  Spark will read values as long and not datetime, so currently a test for this will fail
    
    ```
    In [1]: spark.conf.set("spark.sql.execution.arrow.enabled", "false")
    
    In [2]: import pandas as pd
       ...: from datetime import datetime
       ...: 
    
    In [3]: pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]})
    
    In [4]: df = spark.createDataFrame(pdf)
    
    In [5]: df.show()
    +-------------------+
    |                 ts|
    +-------------------+
    |1509411661000000000|
    +-------------------+
    
    
    In [6]: df.schema
    Out[6]: StructType(List(StructField(ts,LongType,true)))
    
    In [7]: pdf
    Out[7]: 
                       ts
    0 2017-10-31 01:01:01
    
    In [9]: pdf.dtypes
    Out[9]: 
    ts    datetime64[ns]
    dtype: object
    ```
    @HyukjinKwon or @ueshin could you confirm you see the same? and do you consider this a bug?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83001/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r149626358
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -213,7 +213,15 @@ def __repr__(self):
             return "ArrowSerializer"
     
     
    -def _create_batch(series):
    +def _create_batch(series, copy=False):
    --- End diff --
    
    Do we need `copy` here?
    I might miss something but looks like all occurrence of `copy=copy` in this method are always copied by `s.fillna(0)` in advance so we don't need to use `copy=True`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    I made [SPARK-22417](https://issues.apache.org/jira/browse/SPARK-22417) for fixing reading from timestamps without arrow


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144600676
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                from pyspark.serializers import ArrowSerializer
    +                from pyspark.sql.types import from_arrow_schema
    +                import pyarrow as pa
    +
    +                # Slice the DataFrame into batches
    +                split = -(-len(data) // self.sparkContext.defaultParallelism)  # round int up
    +                slices = (data[i:i + split] for i in xrange(0, len(data), split))
    --- End diff --
    
    Yea, sounds fine.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r146619813
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,52 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer, _create_batch
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        if schema is None or isinstance(schema, list):
    +            batches = [pa.RecordBatch.from_pandas(pdf_slice, preserve_index=False)
    +                       for pdf_slice in pdf_slices]
    +
    +            # There will be at least 1 batch after slicing the pandas.DataFrame
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +
    +            # If passed schema as a list of names then rename fields
    +            if isinstance(schema, list):
    +                fields = []
    +                for i, field in enumerate(schema_from_arrow):
    +                    field.name = schema[i]
    +                    fields.append(field)
    +                schema = StructType(fields)
    +            else:
    +                schema = schema_from_arrow
    +        else:
    +            if not isinstance(schema, StructType) and isinstance(schema, DataType):
    +                schema = StructType().add("value", schema)
    --- End diff --
    
    Sorry, I misunderstood. I'm fine with not supporting this case and falling back.  Like you pointed out, as this is, it doesn't make much sense to specify a single type for a pd.DataFrame.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144594930
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
             except Exception:
                 has_pandas = False
             if has_pandas and isinstance(data, pandas.DataFrame):
    -            if schema is None:
    -                schema = [str(x) for x in data.columns]
    -            data = [r.tolist() for r in data.to_records(index=False)]
    +            if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
    +                    and len(data) > 0:
    +                from pyspark.serializers import ArrowSerializer
    +                from pyspark.sql.types import from_arrow_schema
    +                import pyarrow as pa
    +
    +                # Slice the DataFrame into batches
    +                split = -(-len(data) // self.sparkContext.defaultParallelism)  # round int up
    +                slices = (data[i:i + split] for i in xrange(0, len(data), split))
    --- End diff --
    
    sure, how about size, start and step?  That looks like the terminology used in `parallelize`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145294433
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +        try:
    +            serializer = ArrowSerializer()
    +            serializer.dump_stream(batches, tempFile)
    +            tempFile.close()
    +            readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +            jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +        finally:
    +            # readRDDFromFile eagerily reads the file so we can delete right after.
    +            os.unlink(tempFile.name)
    +
    +        # Create the Spark DataFrame, there will be at least 1 batch
    +        schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    Hmmmm.. still get the same exception with pyarrow 0.4.1 ..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150229857
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3180,6 +3185,58 @@ def test_filtered_frame(self):
             self.assertEqual(pdf.columns[0], "i")
             self.assertTrue(pdf.empty)
     
    +    def test_createDataFrame_toggle(self):
    +        pdf = self.create_pandas_data_frame()
    +        self.spark.conf.set("spark.sql.execution.arrow.enabled", "false")
    +        try:
    +            df_no_arrow = self.spark.createDataFrame(pdf)
    +        finally:
    +            self.spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    +        df_arrow = self.spark.createDataFrame(pdf)
    +        self.assertEquals(df_no_arrow.collect(), df_arrow.collect())
    +
    +    def test_createDataFrame_with_schema(self):
    +        pdf = self.create_pandas_data_frame()
    +        df = self.spark.createDataFrame(pdf, schema=self.schema)
    +        self.assertEquals(self.schema, df.schema)
    +        pdf_arrow = df.toPandas()
    +        self.assertFramesEqual(pdf_arrow, pdf)
    +
    +    def test_createDataFrame_with_incorrect_schema(self):
    +        pdf = self.create_pandas_data_frame()
    +        wrong_schema = StructType([field for field in reversed(self.schema)])
    --- End diff --
    
    Not a big deal at all: `StructType([field for field in reversed(self.schema)])` -> `StructType(list(reversed(st)))`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    @ueshin @HyukjinKwon does this look ready to merge?  cc @cloud-fan 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144706910
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    --- End diff --
    
    nit: df -> pdf.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82601/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #82601 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82601/testReport)** for PR 19459 at commit [`c7ddee6`](https://github.com/apache/spark/commit/c7ddee6b7ab91c1651a397a716ed91ed2a8383a3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r143610100
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---
    @@ -203,4 +205,16 @@ private[sql] object ArrowConverters {
           reader.close()
         }
       }
    +
    +  def toDataFrame(
    --- End diff --
    
    Yup, I think we should put it there.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145334576
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,39 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(pdf_slice, schema=arrow_schema, preserve_index=False)
    +                   for pdf_slice in pdf_slices]
    +
    +        # Verify schema, there will be at least 1 batch from pandas.DataFrame
    +        schema_from_arrow = from_arrow_schema(batches[0].schema)
    +        if schema is not None and schema != schema_from_arrow:
    +            raise ValueError("Supplied schema does not match result from Arrow\nsupplied: " +
    +                             "%s\n!=\nfrom Arrow: %s" % (str(schema), str(schema_from_arrow)))
    --- End diff --
    
    It's okay to fallback with warnings, but I think we should try to adjust types specified by users before that. Otherwise, users can never get the benefit from Arrow when users don't know how to adjust types especially integral types including NaN values.
    We can split pandas DataFrame into Series once and adjust types during building RecordBatches. I guess we should modify the timestamp values to have timezone for each Series when we support timestamp type anyway.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83579 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83579/testReport)** for PR 19459 at commit [`99ce1e4`](https://github.com/apache/spark/commit/99ce1e44f57c411af95b1c9d9c95f35f2c1652e1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144750372
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -70,12 +70,12 @@ class DataFrame(object):
         .. versionadded:: 1.3
         """
     
    -    def __init__(self, jdf, sql_ctx):
    +    def __init__(self, jdf, sql_ctx, schema=None):
             self._jdf = jdf
             self.sql_ctx = sql_ctx
             self._sc = sql_ctx and sql_ctx._sc
             self.is_cached = False
    -        self._schema = None  # initialized lazily
    +        self._schema = schema  # initialized lazily if None
    --- End diff --
    
    @BryanCutler, what do you think about taking this out back? Maybe, I am too much worried but I think we maybe should avoid it to be assigned actually except for few special cases ...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #83761 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83761/testReport)** for PR 19459 at commit [`6c72e37`](https://github.com/apache/spark/commit/6c72e37b0ca520d2756722ce2f18fae3ea32c39e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144750565
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala ---
    @@ -29,4 +32,19 @@ private[sql] object PythonSQLUtils {
       def listBuiltinFunctionInfos(): Array[ExpressionInfo] = {
         FunctionRegistry.functionSet.flatMap(f => FunctionRegistry.builtin.lookupFunction(f)).toArray
       }
    +
    +  /**
    +   * Python Callable function to convert ArrowPayloads into a [[DataFrame]].
    +   *
    +   * @param payloadRDD A JavaRDD of ArrowPayloads.
    +   * @param schemaString JSON Formatted Schema for ArrowPayloads.
    +   * @param sqlContext The active [[SQLContext]].
    +   * @return The converted [[DataFrame]].
    +   */
    +  def arrowPayloadToDataFrame(
    +       payloadRDD: JavaRDD[Array[Byte]],
    +       schemaString: String,
    +       sqlContext: SQLContext): DataFrame = {
    --- End diff --
    
    I can't believe I found this looks 5 spaces instead of 4.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    **[Test build #82573 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82573/testReport)** for PR 19459 at commit [`9d667c6`](https://github.com/apache/spark/commit/9d667c6fcb7e47169a2e48ec130fbdbb42a21f41).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Yes, I meant to ask for some clarification from @ueshin 
    for https://github.com/apache/spark/pull/19459#discussion_r145034007
    
    > Btw, do we also need to support schema like ['name', 'age'], "int"(not StructType), etc. from doctest?
    
    It looks like it handles the case when the schema is a single string [here](url), but I think you are referring to a list of strings that are column names right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144931022
  
    --- Diff: python/pyspark/sql/dataframe.py ---
    @@ -70,12 +70,12 @@ class DataFrame(object):
         .. versionadded:: 1.3
         """
     
    -    def __init__(self, jdf, sql_ctx):
    +    def __init__(self, jdf, sql_ctx, schema=None):
             self._jdf = jdf
             self.sql_ctx = sql_ctx
             self._sc = sql_ctx and sql_ctx._sc
             self.is_cached = False
    -        self._schema = None  # initialized lazily
    +        self._schema = schema  # initialized lazily if None
    --- End diff --
    
    Sure, I can undo it.  I don't really like manually assigning the schema after the constructor but it's just done in these 2 special cases..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150308054
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -225,11 +232,11 @@ def _create_batch(series):
         # If a nullable integer series has been promoted to floating point with NaNs, need to cast
         # NOTE: this is not necessary with Arrow >= 0.7
         def cast_series(s, t):
    -        if type(t) == pa.TimestampType:
    +        if t is not None and type(t) == pa.TimestampType:
    --- End diff --
    
    verified this is not an issue with pyarrow >= 0.7.1


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145796514
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,73 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, _cast_pandas_series_type
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +
    +        if schema is None or isinstance(schema, list):
    +            batches = [pa.RecordBatch.from_pandas(pdf_slice, preserve_index=False)
    +                       for pdf_slice in pdf_slices]
    +
    +            # There will be at least 1 batch after slicing the pandas.DataFrame
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +
    +            # If passed schema as a list of names then rename fields
    +            if isinstance(schema, list):
    +                fields = []
    +                for i, field in enumerate(schema_from_arrow):
    +                    field.name = schema[i]
    +                    fields.append(field)
    +                schema = StructType(fields)
    +            else:
    +                schema = schema_from_arrow
    +        else:
    +            batches = []
    +            for i, pdf_slice in enumerate(pdf_slices):
    +
    +                # convert to series to pyarrow.Arrays to use mask when creating Arrow batches
    +                arrs = []
    +                names = []
    +                for c, (_, series) in enumerate(pdf_slice.iteritems()):
    +                    field = schema[c]
    +                    names.append(field.name)
    +                    t = to_arrow_type(field.dataType)
    +                    try:
    +                        # NOTE: casting is not necessary with Arrow >= 0.7
    +                        arrs.append(pa.Array.from_pandas(_cast_pandas_series_type(series, t),
    +                                                         mask=series.isnull(), type=t))
    +                    except ValueError as e:
    +                        warnings.warn("Arrow will not be used in createDataFrame: %s" % str(e))
    +                        return None
    +                batches.append(pa.RecordBatch.from_arrays(arrs, names))
    +
    +                # Verify schema of first batch, return None if not equal and fallback without Arrow
    +                if i == 0:
    +                    schema_from_arrow = from_arrow_schema(batches[i].schema)
    +                    if schema != schema_from_arrow:
    +                        warnings.warn("Arrow will not be used in createDataFrame.\n" +
    --- End diff --
    
    Thanks @ueshin! This does simplify things quite a bit, which I like.  My only concerns are that we rely on Arrow/Pandas to raise an error somewhere during the casting in order to fallback, and the fields in the Arrow record batches get arbitrary names (doesn't use schema names).  What are your thoughts @HyukjinKwon ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19459
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82866/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145029289
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +        try:
    +            serializer = ArrowSerializer()
    +            serializer.dump_stream(batches, tempFile)
    +            tempFile.close()
    +            readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +            jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +        finally:
    +            # readRDDFromFile eagerily reads the file so we can delete right after.
    +            os.unlink(tempFile.name)
    +
    +        # Create the Spark DataFrame, there will be at least 1 batch
    +        schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    There might be some differences because without Arrow, Spark just gets the column names from Pandas and infers the data type. This is by design, but I've seen a lot of users get tripped up by it and create JIRAs. With Arrow, it makes it easy to use the schema from Pandas so I would consider this an improvement.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145490665
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,39 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else None
    +        batches = [pa.RecordBatch.from_pandas(pdf_slice, schema=arrow_schema, preserve_index=False)
    +                   for pdf_slice in pdf_slices]
    +
    +        # Verify schema, there will be at least 1 batch from pandas.DataFrame
    +        schema_from_arrow = from_arrow_schema(batches[0].schema)
    +        if schema is not None and schema != schema_from_arrow:
    +            raise ValueError("Supplied schema does not match result from Arrow\nsupplied: " +
    +                             "%s\n!=\nfrom Arrow: %s" % (str(schema), str(schema_from_arrow)))
    --- End diff --
    
    Ok, I'll give it a try to do the following:  if a schema is supplied, then try to adjust the types accordingly, then convert the first batch and check the schema from arrow matches the supplied, if it doesn't then produce a warning and fall back to conversion without arrow


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org