You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ue...@apache.org on 2017/07/20 03:46:13 UTC

spark git commit: [SPARK-16542][SQL][PYSPARK] Fix bugs about types that result an array of null when creating DataFrame using python

Repository: spark
Updated Branches:
  refs/heads/master 2c9d5ef1f -> b7a40f64e


[SPARK-16542][SQL][PYSPARK] Fix bugs about types that result an array of null when creating DataFrame using python

## What changes were proposed in this pull request?
This is the reopen of https://github.com/apache/spark/pull/14198, with merge conflicts resolved.

ueshin Could you please take a look at my code?

Fix bugs about types that result an array of null when creating DataFrame using python.

Python's array.array have richer type than python itself, e.g. we can have `array('f',[1,2,3])` and `array('d',[1,2,3])`. Codes in spark-sql and pyspark didn't take this into consideration which might cause a problem that you get an array of null values when you have `array('f')` in your rows.

A simple code to reproduce this bug is:

```
from pyspark import SparkContext
from pyspark.sql import SQLContext,Row,DataFrame
from array import array

sc = SparkContext()
sqlContext = SQLContext(sc)

row1 = Row(floatarray=array('f',[1,2,3]), doublearray=array('d',[1,2,3]))
rows = sc.parallelize([ row1 ])
df = sqlContext.createDataFrame(rows)
df.show()
```

which have output

```
+---------------+------------------+
|    doublearray|        floatarray|
+---------------+------------------+
|[1.0, 2.0, 3.0]|[null, null, null]|
+---------------+------------------+
```

## How was this patch tested?

New test case added

Author: Xiang Gao <qa...@gmail.com>
Author: Gao, Xiang <qa...@gmail.com>
Author: Takuya UESHIN <ue...@databricks.com>

Closes #18444 from zasdfgbnm/fix_array_infer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7a40f64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7a40f64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7a40f64

Branch: refs/heads/master
Commit: b7a40f64e6d83bb6704ac3a63c46cc7c0e9f9e23
Parents: 2c9d5ef
Author: Xiang Gao <qa...@gmail.com>
Authored: Thu Jul 20 12:46:06 2017 +0900
Committer: Takuya UESHIN <ue...@databricks.com>
Committed: Thu Jul 20 12:46:06 2017 +0900

----------------------------------------------------------------------
 .../org/apache/spark/api/python/SerDeUtil.scala | 20 +++-
 python/pyspark/sql/tests.py                     | 97 +++++++++++++++++++-
 python/pyspark/sql/types.py                     | 95 ++++++++++++++++++-
 .../sql/execution/python/EvaluatePython.scala   | 10 ++
 4 files changed, 216 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b7a40f64/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 42f67e8..aaf8e7a 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -55,13 +55,12 @@ private[spark] object SerDeUtil extends Logging {
     //    {'d', sizeof(double), d_getitem, d_setitem},
     //    {'\0', 0, 0, 0} /* Sentinel */
     //  };
-    // TODO: support Py_UNICODE with 2 bytes
     val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
-      Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9,
+      Map('B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9,
         'L' -> 11, 'l' -> 13, 'f' -> 15, 'd' -> 17, 'u' -> 21
       )
     } else {
-      Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8,
+      Map('B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8,
         'L' -> 10, 'l' -> 12, 'f' -> 14, 'd' -> 16, 'u' -> 20
       )
     }
@@ -72,7 +71,20 @@ private[spark] object SerDeUtil extends Logging {
         val typecode = args(0).asInstanceOf[String].charAt(0)
         // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly
         val data = args(1).asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1)
-        construct(typecode, machineCodes(typecode), data)
+        if (typecode == 'c') {
+          // It seems like the pickle of pypy uses the similar protocol to Python 2.6, which uses
+          // a string for array data instead of list as Python 2.7, and handles an array of
+          // typecode 'c' as 1-byte character.
+          val result = new Array[Char](data.length)
+          var i = 0
+          while (i < data.length) {
+            result(i) = data(i).toChar
+            i += 1
+          }
+          result
+        } else {
+          construct(typecode, machineCodes(typecode), data)
+        }
       } else if (args.length == 2 && args(0) == "l") {
         // On Python 2, an array of typecode 'l' should be handled as long rather than int.
         val values = args(1).asInstanceOf[JArrayList[_]]

http://git-wip-us.apache.org/repos/asf/spark/blob/b7a40f64/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index be5495c..1c1a0ca 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -30,8 +30,10 @@ import pickle
 import functools
 import time
 import datetime
-
+import array
+import ctypes
 import py4j
+
 try:
     import xmlrunner
 except ImportError:
@@ -58,6 +60,8 @@ from pyspark import SparkContext
 from pyspark.sql import SparkSession, SQLContext, HiveContext, Column, Row
 from pyspark.sql.types import *
 from pyspark.sql.types import UserDefinedType, _infer_type, _make_type_verifier
+from pyspark.sql.types import _array_signed_int_typecode_ctype_mappings, _array_type_mappings
+from pyspark.sql.types import _array_unsigned_int_typecode_ctype_mappings
 from pyspark.tests import QuietTest, ReusedPySparkTestCase, SparkSubmitTests
 from pyspark.sql.functions import UserDefinedFunction, sha2, lit
 from pyspark.sql.window import Window
@@ -2333,6 +2337,97 @@ class SQLTests(ReusedPySparkTestCase):
         df = self.spark.createDataFrame(data, schema=schema)
         df.collect()
 
+    # test for SPARK-16542
+    def test_array_types(self):
+        # This test need to make sure that the Scala type selected is at least
+        # as large as the python's types. This is necessary because python's
+        # array types depend on C implementation on the machine. Therefore there
+        # is no machine independent correspondence between python's array types
+        # and Scala types.
+        # See: https://docs.python.org/2/library/array.html
+
+        def assertCollectSuccess(typecode, value):
+            row = Row(myarray=array.array(typecode, [value]))
+            df = self.spark.createDataFrame([row])
+            self.assertEqual(df.first()["myarray"][0], value)
+
+        # supported string types
+        #
+        # String types in python's array are "u" for Py_UNICODE and "c" for char.
+        # "u" will be removed in python 4, and "c" is not supported in python 3.
+        supported_string_types = []
+        if sys.version_info[0] < 4:
+            supported_string_types += ['u']
+            # test unicode
+            assertCollectSuccess('u', u'a')
+        if sys.version_info[0] < 3:
+            supported_string_types += ['c']
+            # test string
+            assertCollectSuccess('c', 'a')
+
+        # supported float and double
+        #
+        # Test max, min, and precision for float and double, assuming IEEE 754
+        # floating-point format.
+        supported_fractional_types = ['f', 'd']
+        assertCollectSuccess('f', ctypes.c_float(1e+38).value)
+        assertCollectSuccess('f', ctypes.c_float(1e-38).value)
+        assertCollectSuccess('f', ctypes.c_float(1.123456).value)
+        assertCollectSuccess('d', sys.float_info.max)
+        assertCollectSuccess('d', sys.float_info.min)
+        assertCollectSuccess('d', sys.float_info.epsilon)
+
+        # supported signed int types
+        #
+        # The size of C types changes with implementation, we need to make sure
+        # that there is no overflow error on the platform running this test.
+        supported_signed_int_types = list(
+            set(_array_signed_int_typecode_ctype_mappings.keys())
+            .intersection(set(_array_type_mappings.keys())))
+        for t in supported_signed_int_types:
+            ctype = _array_signed_int_typecode_ctype_mappings[t]
+            max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1)
+            assertCollectSuccess(t, max_val - 1)
+            assertCollectSuccess(t, -max_val)
+
+        # supported unsigned int types
+        #
+        # JVM does not have unsigned types. We need to be very careful to make
+        # sure that there is no overflow error.
+        supported_unsigned_int_types = list(
+            set(_array_unsigned_int_typecode_ctype_mappings.keys())
+            .intersection(set(_array_type_mappings.keys())))
+        for t in supported_unsigned_int_types:
+            ctype = _array_unsigned_int_typecode_ctype_mappings[t]
+            assertCollectSuccess(t, 2 ** (ctypes.sizeof(ctype) * 8) - 1)
+
+        # all supported types
+        #
+        # Make sure the types tested above:
+        # 1. are all supported types
+        # 2. cover all supported types
+        supported_types = (supported_string_types +
+                           supported_fractional_types +
+                           supported_signed_int_types +
+                           supported_unsigned_int_types)
+        self.assertEqual(set(supported_types), set(_array_type_mappings.keys()))
+
+        # all unsupported types
+        #
+        # Keys in _array_type_mappings is a complete list of all supported types,
+        # and types not in _array_type_mappings are considered unsupported.
+        # `array.typecodes` are not supported in python 2.
+        if sys.version_info[0] < 3:
+            all_types = set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd'])
+        else:
+            all_types = set(array.typecodes)
+        unsupported_types = all_types - set(supported_types)
+        # test unsupported types
+        for t in unsupported_types:
+            with self.assertRaises(TypeError):
+                a = array.array(t)
+                self.spark.createDataFrame([Row(myarray=a)]).collect()
+
     def test_bucketed_write(self):
         data = [
             (1, "foo", 3.0), (2, "foo", 5.0),

http://git-wip-us.apache.org/repos/asf/spark/blob/b7a40f64/python/pyspark/sql/types.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 22fa273..c376805 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -24,6 +24,7 @@ import json
 import re
 import base64
 from array import array
+import ctypes
 
 if sys.version >= "3":
     long = int
@@ -915,6 +916,93 @@ if sys.version < "3":
         long: LongType,
     })
 
+# Mapping Python array types to Spark SQL DataType
+# We should be careful here. The size of these types in python depends on C
+# implementation. We need to make sure that this conversion does not lose any
+# precision. Also, JVM only support signed types, when converting unsigned types,
+# keep in mind that it required 1 more bit when stored as singed types.
+#
+# Reference for C integer size, see:
+# ISO/IEC 9899:201x specification, chapter 5.2.4.2.1 Sizes of integer types <limits.h>.
+# Reference for python array typecode, see:
+# https://docs.python.org/2/library/array.html
+# https://docs.python.org/3.6/library/array.html
+# Reference for JVM's supported integral types:
+# http://docs.oracle.com/javase/specs/jvms/se8/html/jvms-2.html#jvms-2.3.1
+
+_array_signed_int_typecode_ctype_mappings = {
+    'b': ctypes.c_byte,
+    'h': ctypes.c_short,
+    'i': ctypes.c_int,
+    'l': ctypes.c_long,
+}
+
+_array_unsigned_int_typecode_ctype_mappings = {
+    'B': ctypes.c_ubyte,
+    'H': ctypes.c_ushort,
+    'I': ctypes.c_uint,
+    'L': ctypes.c_ulong
+}
+
+
+def _int_size_to_type(size):
+    """
+    Return the Catalyst datatype from the size of integers.
+    """
+    if size <= 8:
+        return ByteType
+    if size <= 16:
+        return ShortType
+    if size <= 32:
+        return IntegerType
+    if size <= 64:
+        return LongType
+
+# The list of all supported array typecodes is stored here
+_array_type_mappings = {
+    # Warning: Actual properties for float and double in C is not specified in C.
+    # On almost every system supported by both python and JVM, they are IEEE 754
+    # single-precision binary floating-point format and IEEE 754 double-precision
+    # binary floating-point format. And we do assume the same thing here for now.
+    'f': FloatType,
+    'd': DoubleType
+}
+
+# compute array typecode mappings for signed integer types
+for _typecode in _array_signed_int_typecode_ctype_mappings.keys():
+    size = ctypes.sizeof(_array_signed_int_typecode_ctype_mappings[_typecode]) * 8
+    dt = _int_size_to_type(size)
+    if dt is not None:
+        _array_type_mappings[_typecode] = dt
+
+# compute array typecode mappings for unsigned integer types
+for _typecode in _array_unsigned_int_typecode_ctype_mappings.keys():
+    # JVM does not have unsigned types, so use signed types that is at least 1
+    # bit larger to store
+    size = ctypes.sizeof(_array_unsigned_int_typecode_ctype_mappings[_typecode]) * 8 + 1
+    dt = _int_size_to_type(size)
+    if dt is not None:
+        _array_type_mappings[_typecode] = dt
+
+# Type code 'u' in Python's array is deprecated since version 3.3, and will be
+# removed in version 4.0. See: https://docs.python.org/3/library/array.html
+if sys.version_info[0] < 4:
+    _array_type_mappings['u'] = StringType
+
+# Type code 'c' are only available at python 2
+if sys.version_info[0] < 3:
+    _array_type_mappings['c'] = StringType
+
+# SPARK-21465:
+# In python2, array of 'L' happened to be mistakenly partially supported. To
+# avoid breaking user's code, we should keep this partial support. Below is a
+# dirty hacking to keep this partial support and make the unit test passes
+import platform
+if sys.version_info[0] < 3 and platform.python_implementation() != 'PyPy':
+    if 'L' not in _array_type_mappings.keys():
+        _array_type_mappings['L'] = LongType
+        _array_unsigned_int_typecode_ctype_mappings['L'] = ctypes.c_uint
+
 
 def _infer_type(obj):
     """Infer the DataType from obj
@@ -938,12 +1026,17 @@ def _infer_type(obj):
                 return MapType(_infer_type(key), _infer_type(value), True)
         else:
             return MapType(NullType(), NullType(), True)
-    elif isinstance(obj, (list, array)):
+    elif isinstance(obj, list):
         for v in obj:
             if v is not None:
                 return ArrayType(_infer_type(obj[0]), True)
         else:
             return ArrayType(NullType(), True)
+    elif isinstance(obj, array):
+        if obj.typecode in _array_type_mappings:
+            return ArrayType(_array_type_mappings[obj.typecode](), False)
+        else:
+            raise TypeError("not supported type: array(%s)" % obj.typecode)
     else:
         try:
             return _infer_schema(obj)

http://git-wip-us.apache.org/repos/asf/spark/blob/b7a40f64/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
index fcd8470..38b3aa7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
@@ -91,20 +91,30 @@ object EvaluatePython {
 
     case (c: Boolean, BooleanType) => c
 
+    case (c: Byte, ByteType) => c
+    case (c: Short, ByteType) => c.toByte
     case (c: Int, ByteType) => c.toByte
     case (c: Long, ByteType) => c.toByte
 
+    case (c: Byte, ShortType) => c.toShort
+    case (c: Short, ShortType) => c
     case (c: Int, ShortType) => c.toShort
     case (c: Long, ShortType) => c.toShort
 
+    case (c: Byte, IntegerType) => c.toInt
+    case (c: Short, IntegerType) => c.toInt
     case (c: Int, IntegerType) => c
     case (c: Long, IntegerType) => c.toInt
 
+    case (c: Byte, LongType) => c.toLong
+    case (c: Short, LongType) => c.toLong
     case (c: Int, LongType) => c.toLong
     case (c: Long, LongType) => c
 
+    case (c: Float, FloatType) => c
     case (c: Double, FloatType) => c.toFloat
 
+    case (c: Float, DoubleType) => c.toDouble
     case (c: Double, DoubleType) => c
 
     case (c: java.math.BigDecimal, dt: DecimalType) => Decimal(c, dt.precision, dt.scale)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org