You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/06/21 17:47:54 UTC
spark git commit: [SPARK-13792][SQL] Addendum: Fix Python API
Repository: spark
Updated Branches:
refs/heads/master 4f83ca105 -> 93338807a
[SPARK-13792][SQL] Addendum: Fix Python API
## What changes were proposed in this pull request?
This is a follow-up to https://github.com/apache/spark/pull/13795 to properly set CSV options in Python API. As part of this, I also make the Python option setting for both CSV and JSON more robust against positional errors.
## How was this patch tested?
N/A
Author: Reynold Xin <rx...@databricks.com>
Closes #13800 from rxin/SPARK-13792-2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93338807
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93338807
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93338807
Branch: refs/heads/master
Commit: 93338807aafdb2db9fb036ceadee1467cd367cdd
Parents: 4f83ca1
Author: Reynold Xin <rx...@databricks.com>
Authored: Tue Jun 21 10:47:51 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Jun 21 10:47:51 2016 -0700
----------------------------------------------------------------------
python/pyspark/sql/readwriter.py | 54 +++++++++++++++++++++--------------
1 file changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/93338807/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 89506ca..ccbf895 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -77,7 +77,7 @@ class ReaderUtils(object):
def _set_csv_opts(self, schema, sep, encoding, quote, escape,
comment, header, inferSchema, ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf,
- dateFormat, maxColumns, maxCharsPerColumn, mode):
+ dateFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode):
"""
Set options based on the CSV optional parameters
"""
@@ -115,6 +115,8 @@ class ReaderUtils(object):
self.option("maxColumns", maxColumns)
if maxCharsPerColumn is not None:
self.option("maxCharsPerColumn", maxCharsPerColumn)
+ if maxMalformedLogPerPartition is not None:
+ self.option("maxMalformedLogPerPartition", maxMalformedLogPerPartition)
if mode is not None:
self.option("mode", mode)
@@ -268,10 +270,12 @@ class DataFrameReader(ReaderUtils):
[('age', 'bigint'), ('name', 'string')]
"""
- self._set_json_opts(schema, primitivesAsString, prefersDecimal,
- allowComments, allowUnquotedFieldNames, allowSingleQuotes,
- allowNumericLeadingZero, allowBackslashEscapingAnyCharacter,
- mode, columnNameOfCorruptRecord)
+ self._set_json_opts(
+ schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal,
+ allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
+ allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
+ allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
+ mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
@@ -343,7 +347,8 @@ class DataFrameReader(ReaderUtils):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
- negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, mode=None):
+ negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
+ maxMalformedLogPerPartition=None, mode=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -408,11 +413,13 @@ class DataFrameReader(ReaderUtils):
>>> df.dtypes
[('_c0', 'string'), ('_c1', 'string')]
"""
-
- self._set_csv_opts(schema, sep, encoding, quote, escape,
- comment, header, inferSchema, ignoreLeadingWhiteSpace,
- ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf,
- dateFormat, maxColumns, maxCharsPerColumn, mode)
+ self._set_csv_opts(
+ schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
+ header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
+ ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
+ nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
+ dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+ maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
@@ -958,10 +965,12 @@ class DataStreamReader(ReaderUtils):
>>> json_sdf.schema == sdf_schema
True
"""
- self._set_json_opts(schema, primitivesAsString, prefersDecimal,
- allowComments, allowUnquotedFieldNames, allowSingleQuotes,
- allowNumericLeadingZero, allowBackslashEscapingAnyCharacter,
- mode, columnNameOfCorruptRecord)
+ self._set_json_opts(
+ schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal,
+ allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
+ allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
+ allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
+ mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
@@ -1019,7 +1028,8 @@ class DataStreamReader(ReaderUtils):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
- negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, mode=None):
+ negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
+ maxMalformedLogPerPartition=None, mode=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -1085,11 +1095,13 @@ class DataStreamReader(ReaderUtils):
>>> csv_sdf.schema == sdf_schema
True
"""
-
- self._set_csv_opts(schema, sep, encoding, quote, escape,
- comment, header, inferSchema, ignoreLeadingWhiteSpace,
- ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf,
- dateFormat, maxColumns, maxCharsPerColumn, mode)
+ self._set_csv_opts(
+ schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
+ header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
+ ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
+ nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
+ dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+ maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org