You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bryan Cutler (JIRA)" <ji...@apache.org> on 2019/06/04 18:12:00 UTC

[jira] [Comment Edited] (SPARK-27939) Defining a schema with VectorUDT

    [ https://issues.apache.org/jira/browse/SPARK-27939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16855966#comment-16855966 ] 

Bryan Cutler edited comment on SPARK-27939 at 6/4/19 6:11 PM:
--------------------------------------------------------------

The problem is the {{Row}} class sorts the field names alphabetically, which puts capital letters first and then conflicts with your schema:
{noformat}
r = Row(features=SparseVector(135, {0: 139900.0, 1: 139900.0, ...}), SALESCLOSEPRICE=143000)

In [3]: r.__fields__
Out[3]: ['SALESCLOSEPRICE', 'features']{noformat}
This is by design, but it is not intuitive and has caused lots of problems. Hopefully, we can improve this for Spark 3.0.0

You can either just specify your data as tuples. for example
{noformat}
In [5]: train_df = spark.createDataFrame([(SparseVector(135, {0: 139900.0}), 143000)], schema=train_schema)

In [6]: train_df.show()
+--------------------+---------------+
| features|SALESCLOSEPRICE|
+--------------------+---------------+
|(135,[0],[139900.0])| 143000|
+--------------------+---------------+
{noformat}
Or if you want to have keywords, then define your own row class like this:
{noformat}
In [7]: MyRow = Row('features', 'SALESCLOSEPRICE')

In [8]: MyRow(SparseVector(135, {0: 139900.0}), 143000)
Out[8]: Row(features=SparseVector(135, {0: 139900.0}), SALESCLOSEPRICE=143000){noformat}


was (Author: bryanc):
The problem is the {{Row}} class sorts the field names alphabetically, which puts capital letters first and then conflicts with your schema:
{noformat}
r = Row(features=SparseVector(135, {0: 139900.0, 1: 139900.0, ...}), SALESCLOSEPRICE=143000)

In [3]: r.__fields__
Out[3]: ['SALESCLOSEPRICE', 'features']{noformat}
This is by design, but it is not intuitive and has caused lots of problems.

You can either just specify your data as tuples. for example
{noformat}
In [5]: train_df = spark.createDataFrame([(SparseVector(135, {0: 139900.0}), 143000)], schema=train_schema)

In [6]: train_df.show()
+--------------------+---------------+
| features|SALESCLOSEPRICE|
+--------------------+---------------+
|(135,[0],[139900.0])| 143000|
+--------------------+---------------+
{noformat}
Or if you want to have keywords, then define your own row class like this:
{noformat}
In [7]: MyRow = Row('features', 'SALESCLOSEPRICE')

In [8]: MyRow(SparseVector(135, {0: 139900.0}), 143000)
Out[8]: Row(features=SparseVector(135, {0: 139900.0}), SALESCLOSEPRICE=143000){noformat}

> Defining a schema with VectorUDT
> --------------------------------
>
>                 Key: SPARK-27939
>                 URL: https://issues.apache.org/jira/browse/SPARK-27939
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, PySpark
>    Affects Versions: 2.4.0
>            Reporter: Johannes Schaffrath
>            Priority: Minor
>
> When I try to define a dataframe schema which has a VectorUDT field, I run into an error when the VectorUDT field is not the last element of the StructType list.
> The following example causes the error below:
> {code:java}
> // from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark.sql import Row
> from pyspark.ml.linalg import VectorUDT, SparseVector
> #VectorUDT should be the last structfield
> train_schema = T.StructType([
>                             T.StructField('features', VectorUDT()),
>                             T.StructField('SALESCLOSEPRICE', T.IntegerType())
>                             ])
>                           
> train_df = spark.createDataFrame(
> [Row(features=SparseVector(135, {0: 139900.0, 1: 139900.0, 2: 980.0, 3: 10.0, 5: 980.0, 6: 1858.0, 7: 1858.0, 8: 980.0, 9: 1950.0, 10: 1.28, 11: 1.0, 12: 1.0, 15: 2.0, 16: 3.0, 20: 2017.0, 21: 7.0, 22: 28.0, 23: 15.0, 24: 196.0, 25: 25.0, 26: -1.0, 27: 4.03, 28: 3.96, 29: 3.88, 30: 3.9, 31: 3.91, 32: 9.8, 33: 22.4, 34: 67.8, 35: 49.8, 36: 11.9, 37: 2.7, 38: 0.2926, 39: 142.7551, 40: 980.0, 41: 0.0133, 42: 1.5, 43: 1.0, 51: -1.0, 52: -1.0, 53: -1.0, 54: -1.0, 55: -1.0, 56: -1.0, 57: -1.0, 62: 1.0, 68: 1.0, 77: 1.0, 81: 1.0, 89: 1.0, 95: 1.0, 96: 1.0, 101: 1.0, 103: 1.0, 108: 1.0, 114: 1.0, 115: 1.0, 123: 1.0, 133: 1.0}), SALESCLOSEPRICE=143000),
>  Row(features=SparseVector(135, {0: 210000.0, 1: 210000.0, 2: 1144.0, 3: 4.0, 5: 1268.0, 6: 1640.0, 7: 1640.0, 8: 2228.0, 9: 1971.0, 10: 0.32, 11: 1.0, 14: 2.0, 15: 3.0, 16: 4.0, 17: 960.0, 20: 2017.0, 21: 10.0, 22: 41.0, 23: 9.0, 24: 282.0, 25: 2.0, 26: -1.0, 27: 3.91, 28: 3.85, 29: 3.83, 30: 3.83, 31: 3.78, 32: 32.2, 33: 49.0, 34: 18.8, 35: 14.0, 36: 35.8, 37: 14.6, 38: 0.4392, 39: 94.2549, 40: 2228.0, 41: 0.0078, 42: 1.3333, 43: -1.0, 44: -1.0, 45: -1.0, 46: -1.0, 47: -1.0, 48: -1.0, 49: -1.0, 50: -1.0, 52: 1.0, 55: -1.0, 56: -1.0, 57: -1.0, 62: 1.0, 68: 1.0, 77: 1.0, 79: 1.0, 89: 1.0, 92: 1.0, 96: 1.0, 101: 1.0, 103: 1.0, 108: 1.0, 114: 1.0, 115: 1.0, 124: 1.0, 133: 1.0}), SALESCLOSEPRICE=190000),
>  Row(features=SparseVector(135, {0: 225000.0, 1: 225000.0, 2: 1102.0, 3: 28.0, 5: 1102.0, 6: 2390.0, 7: 2390.0, 8: 1102.0, 9: 1949.0, 10: 0.822, 11: 1.0, 15: 1.0, 16: 2.0, 20: 2017.0, 21: 6.0, 22: 26.0, 23: 26.0, 24: 177.0, 25: 25.0, 26: -1.0, 27: 3.88, 28: 3.9, 29: 3.91, 30: 3.89, 31: 3.94, 32: 9.8, 33: 22.4, 34: 67.8, 35: 61.7, 36: 2.7, 38: 0.4706, 39: 204.1742, 40: 1102.0, 41: 0.0106, 42: 2.0, 49: 1.0, 51: -1.0, 52: -1.0, 53: -1.0, 54: -1.0, 57: 1.0, 62: 1.0, 68: 1.0, 70: 1.0, 79: 1.0, 89: 1.0, 92: 1.0, 96: 1.0, 100: 1.0, 103: 1.0, 108: 1.0, 110: 1.0, 115: 1.0, 123: 1.0, 131: 1.0, 132: 1.0}), SALESCLOSEPRICE=225000)
>  ], schema=train_schema)
>  
> train_df.printSchema()
> train_df.show()
> {code}
> Error  message:
> {code:java}
> // Fail to execute line 17: ], schema=train_schema) Traceback (most recent call last): File "/tmp/zeppelin_pyspark-3793375738105660281.py", line 375, in <module> exec(code, _zcUserQueryNameSpace) File "<stdin>", line 17, in <module> File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 748, in createDataFrame rdd, schema = self._createFromLocal(map(prepare, data), schema) File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 429, in _createFromLocal data = [schema.toInternal(row) for row in data] File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 429, in <listcomp> data = [schema.toInternal(row) for row in data] File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 604, in toInternal for f, v, c in zip(self.fields, obj, self._needConversion)) File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 604, in <genexpr> for f, v, c in zip(self.fields, obj, self._needConversion)) File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 442, in toInternal return self.dataType.toInternal(obj) File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 685, in toInternal return self._cachedSqlType().toInternal(self.serialize(obj)) File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/linalg/__init__.py", line 167, in serialize raise TypeError("cannot serialize %r of type %r" % (obj, type(obj))) TypeError: cannot serialize 143000 of type <class 'int'>{code}
> I don't get an error when I modify the order of the schema:
> {code:java}
> // from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark.sql import Row
> from pyspark.ml.linalg import VectorUDT, SparseVector
> #VectorUDT should be the last structfield
> train_schema = T.StructType([
>                             T.StructField('SALESCLOSEPRICE', T.IntegerType()),
>                             T.StructField('features', VectorUDT())
>                             ])
>                           
> train_df = spark.createDataFrame(
> [Row(features=SparseVector(135, {0: 139900.0, 1: 139900.0, 2: 980.0, 3: 10.0, 5: 980.0, 6: 1858.0, 7: 1858.0, 8: 980.0, 9: 1950.0, 10: 1.28, 11: 1.0, 12: 1.0, 15: 2.0, 16: 3.0, 20: 2017.0, 21: 7.0, 22: 28.0, 23: 15.0, 24: 196.0, 25: 25.0, 26: -1.0, 27: 4.03, 28: 3.96, 29: 3.88, 30: 3.9, 31: 3.91, 32: 9.8, 33: 22.4, 34: 67.8, 35: 49.8, 36: 11.9, 37: 2.7, 38: 0.2926, 39: 142.7551, 40: 980.0, 41: 0.0133, 42: 1.5, 43: 1.0, 51: -1.0, 52: -1.0, 53: -1.0, 54: -1.0, 55: -1.0, 56: -1.0, 57: -1.0, 62: 1.0, 68: 1.0, 77: 1.0, 81: 1.0, 89: 1.0, 95: 1.0, 96: 1.0, 101: 1.0, 103: 1.0, 108: 1.0, 114: 1.0, 115: 1.0, 123: 1.0, 133: 1.0}), SALESCLOSEPRICE=143000),
>  Row(features=SparseVector(135, {0: 210000.0, 1: 210000.0, 2: 1144.0, 3: 4.0, 5: 1268.0, 6: 1640.0, 7: 1640.0, 8: 2228.0, 9: 1971.0, 10: 0.32, 11: 1.0, 14: 2.0, 15: 3.0, 16: 4.0, 17: 960.0, 20: 2017.0, 21: 10.0, 22: 41.0, 23: 9.0, 24: 282.0, 25: 2.0, 26: -1.0, 27: 3.91, 28: 3.85, 29: 3.83, 30: 3.83, 31: 3.78, 32: 32.2, 33: 49.0, 34: 18.8, 35: 14.0, 36: 35.8, 37: 14.6, 38: 0.4392, 39: 94.2549, 40: 2228.0, 41: 0.0078, 42: 1.3333, 43: -1.0, 44: -1.0, 45: -1.0, 46: -1.0, 47: -1.0, 48: -1.0, 49: -1.0, 50: -1.0, 52: 1.0, 55: -1.0, 56: -1.0, 57: -1.0, 62: 1.0, 68: 1.0, 77: 1.0, 79: 1.0, 89: 1.0, 92: 1.0, 96: 1.0, 101: 1.0, 103: 1.0, 108: 1.0, 114: 1.0, 115: 1.0, 124: 1.0, 133: 1.0}), SALESCLOSEPRICE=190000),
>  Row(features=SparseVector(135, {0: 225000.0, 1: 225000.0, 2: 1102.0, 3: 28.0, 5: 1102.0, 6: 2390.0, 7: 2390.0, 8: 1102.0, 9: 1949.0, 10: 0.822, 11: 1.0, 15: 1.0, 16: 2.0, 20: 2017.0, 21: 6.0, 22: 26.0, 23: 26.0, 24: 177.0, 25: 25.0, 26: -1.0, 27: 3.88, 28: 3.9, 29: 3.91, 30: 3.89, 31: 3.94, 32: 9.8, 33: 22.4, 34: 67.8, 35: 61.7, 36: 2.7, 38: 0.4706, 39: 204.1742, 40: 1102.0, 41: 0.0106, 42: 2.0, 49: 1.0, 51: -1.0, 52: -1.0, 53: -1.0, 54: -1.0, 57: 1.0, 62: 1.0, 68: 1.0, 70: 1.0, 79: 1.0, 89: 1.0, 92: 1.0, 96: 1.0, 100: 1.0, 103: 1.0, 108: 1.0, 110: 1.0, 115: 1.0, 123: 1.0, 131: 1.0, 132: 1.0}), SALESCLOSEPRICE=225000)
>  ], schema=train_schema)
>  
> train_df.printSchema()
> train_df.show()
> {code}
> Output:
> {code:java}
> // root 
>  |-- SALESCLOSEPRICE: integer (nullable = true) 
>  |-- features: vector (nullable = true) 
> +---------------+--------------------+ 
> |SALESCLOSEPRICE|            features| 
> +---------------+--------------------+ 
> |         143000|(135,[0,1,2,3,5,6...| 
> |         190000|(135,[0,1,2,3,5,6...| 
> |         225000|(135,[0,1,2,3,5,6...| 
> +---------------+--------------------+
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org