You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/06/21 17:47:54 UTC

spark git commit: [SPARK-13792][SQL] Addendum: Fix Python API

Repository: spark
Updated Branches:
  refs/heads/master 4f83ca105 -> 93338807a


[SPARK-13792][SQL] Addendum: Fix Python API

## What changes were proposed in this pull request?
This is a follow-up to https://github.com/apache/spark/pull/13795 to properly set CSV options in Python API. As part of this, I also make the Python option setting for both CSV and JSON more robust against positional errors.

## How was this patch tested?
N/A

Author: Reynold Xin <rx...@databricks.com>

Closes #13800 from rxin/SPARK-13792-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93338807
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93338807
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93338807

Branch: refs/heads/master
Commit: 93338807aafdb2db9fb036ceadee1467cd367cdd
Parents: 4f83ca1
Author: Reynold Xin <rx...@databricks.com>
Authored: Tue Jun 21 10:47:51 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Jun 21 10:47:51 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py | 54 +++++++++++++++++++++--------------
 1 file changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/93338807/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 89506ca..ccbf895 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -77,7 +77,7 @@ class ReaderUtils(object):
     def _set_csv_opts(self, schema, sep, encoding, quote, escape,
                       comment, header, inferSchema, ignoreLeadingWhiteSpace,
                       ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf,
-                      dateFormat, maxColumns, maxCharsPerColumn, mode):
+                      dateFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode):
         """
         Set options based on the CSV optional parameters
         """
@@ -115,6 +115,8 @@ class ReaderUtils(object):
             self.option("maxColumns", maxColumns)
         if maxCharsPerColumn is not None:
             self.option("maxCharsPerColumn", maxCharsPerColumn)
+        if maxMalformedLogPerPartition is not None:
+            self.option("maxMalformedLogPerPartition", maxMalformedLogPerPartition)
         if mode is not None:
             self.option("mode", mode)
 
@@ -268,10 +270,12 @@ class DataFrameReader(ReaderUtils):
         [('age', 'bigint'), ('name', 'string')]
 
         """
-        self._set_json_opts(schema, primitivesAsString, prefersDecimal,
-                            allowComments, allowUnquotedFieldNames, allowSingleQuotes,
-                            allowNumericLeadingZero, allowBackslashEscapingAnyCharacter,
-                            mode, columnNameOfCorruptRecord)
+        self._set_json_opts(
+            schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal,
+            allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
+            allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
+            allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
+            mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
         if isinstance(path, basestring):
             path = [path]
         if type(path) == list:
@@ -343,7 +347,8 @@ class DataFrameReader(ReaderUtils):
     def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
             comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
             ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
-            negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, mode=None):
+            negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
+            maxMalformedLogPerPartition=None, mode=None):
         """Loads a CSV file and returns the result as a  :class:`DataFrame`.
 
         This function will go through the input once to determine the input schema if
@@ -408,11 +413,13 @@ class DataFrameReader(ReaderUtils):
         >>> df.dtypes
         [('_c0', 'string'), ('_c1', 'string')]
         """
-
-        self._set_csv_opts(schema, sep, encoding, quote, escape,
-                           comment, header, inferSchema, ignoreLeadingWhiteSpace,
-                           ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf,
-                           dateFormat, maxColumns, maxCharsPerColumn, mode)
+        self._set_csv_opts(
+            schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
+            header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
+            ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
+            nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
+            dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+            maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
         if isinstance(path, basestring):
             path = [path]
         return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
@@ -958,10 +965,12 @@ class DataStreamReader(ReaderUtils):
         >>> json_sdf.schema == sdf_schema
         True
         """
-        self._set_json_opts(schema, primitivesAsString, prefersDecimal,
-                            allowComments, allowUnquotedFieldNames, allowSingleQuotes,
-                            allowNumericLeadingZero, allowBackslashEscapingAnyCharacter,
-                            mode, columnNameOfCorruptRecord)
+        self._set_json_opts(
+            schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal,
+            allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
+            allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
+            allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
+            mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
         if isinstance(path, basestring):
             return self._df(self._jreader.json(path))
         else:
@@ -1019,7 +1028,8 @@ class DataStreamReader(ReaderUtils):
     def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
             comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
             ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
-            negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, mode=None):
+            negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
+            maxMalformedLogPerPartition=None, mode=None):
         """Loads a CSV file stream and returns the result as a  :class:`DataFrame`.
 
         This function will go through the input once to determine the input schema if
@@ -1085,11 +1095,13 @@ class DataStreamReader(ReaderUtils):
         >>> csv_sdf.schema == sdf_schema
         True
         """
-
-        self._set_csv_opts(schema, sep, encoding, quote, escape,
-                           comment, header, inferSchema, ignoreLeadingWhiteSpace,
-                           ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf,
-                           dateFormat, maxColumns, maxCharsPerColumn, mode)
+        self._set_csv_opts(
+            schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
+            header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
+            ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
+            nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
+            dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+            maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
         if isinstance(path, basestring):
             return self._df(self._jreader.csv(path))
         else:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org