You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@phoenix.apache.org by "Alvaro Fernandez (Jira)" <ji...@apache.org> on 2021/09/21 20:06:00 UTC

[jira] [Updated] (PHOENIX-6559) spark connector access to SmallintArray / UnsignedSmallintArray columns

     [ https://issues.apache.org/jira/browse/PHOENIX-6559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alvaro Fernandez updated PHOENIX-6559:
--------------------------------------
    Description: 
We have some tables defined with SMALLINT array[] columns, that are not accessible correctly with the spark connector.

Seems that the Spark data type is incorrectly inferred by the connector as an array of integers ArrayType(IntegerType), instead of ArrayType(ShortType).

 A table example:
{code:java}
CREATE TABLE IF NOT EXISTS AEIDEV.ARRAY_TABLE (ID BIGINT NOT NULL PRIMARY KEY, COL1 SMALLINT ARRAY[] );
 UPSERT INTO AEIDEV.ARRAY_TABLE VALUES (1, ARRAY[-32678,-9876,-234,-1]);
 UPSERT INTO AEIDEV.ARRAY_TABLE VALUES (2, ARRAY[0,8,9,10]);
 UPSERT INTO AEIDEV.ARRAY_TABLE VALUES (3, ARRAY[123,1234,12345,32767]);{code}
 Accessing the values from Spark gives wrong values:

 
{code:java}
scala> val df = spark.sqlContext.read.format("org.apache.phoenix.spark").option("table","AEIDEV.ARRAY_TABLE").option("zkUrl","ithdp1101.cern.ch:2181").load
 df: org.apache.spark.sql.DataFrame = [ID: bigint, COL1: array<int>]
scala> df.show
 ---------------------+

ID COL1
---------------------+

1 [-647200678, -234...   2 [524288, 655369, ...   3 [80871547, 214743...
---------------------+
scala> df.collect
 res3: Array[org.apache.spark.sql.Row] = Array([1,WrappedArray(-647200678, -234, 0, 0)], [2,WrappedArray(524288, 655369, 0, 0)], [3,WrappedArray(80871547, 2147430457, 0, 0)])
{code}
We have identified the problem in the SparkSchemaUtil class, and applied the tiny patch included in the report. After this, the data type is correctly inferred and results are correct:

 
{code:java}
scala> val df = spark.sqlContext.read.format("org.apache.phoenix.spark").option("table","AEIDEV.ARRAY_TABLE").option("zkUrl","ithdp1101.cern.ch:2181").load
 df: org.apache.spark.sql.DataFrame = [ID: bigint, COL1: array<smallint>]
scala> df.show
 ---------------------+

ID COL1
---------------------+

1 [-32678, -9876, -...   2 [0, 8, 9, 10]   3 [123, 1234, 12345...
---------------------+
scala> df.collect
 res1: Array[org.apache.spark.sql.Row] = Array([1,WrappedArray(-32678, -9876, -234, -1)], [2,WrappedArray(0, 8, 9, 10)], [3,WrappedArray(123, 1234, 12345, 32767)])
{code}
 

 

We can provide more information and submit a merge request if needed.

 

 

 

  was:
We have some tables defined with SMALLINT array[] columns, that are not accessible correctly with the spark connector.

Seems that the Spark data type is incorrectly inferred by the connector array of integers ArrayType(IntegerType) instead of ArrayType(ShortType).

 
 * A table example:

CREATE TABLE IF NOT EXISTS AEIDEV.ARRAY_TABLE (ID BIGINT NOT NULL PRIMARY KEY, COL1 SMALLINT ARRAY[] );
 UPSERT INTO AEIDEV.ARRAY_TABLE VALUES (1, ARRAY[-32678,-9876,-234,-1]);
 UPSERT INTO AEIDEV.ARRAY_TABLE VALUES (2, ARRAY[0,8,9,10]);
 UPSERT INTO AEIDEV.ARRAY_TABLE VALUES (3, ARRAY[123,1234,12345,32767]);

 
 * Accessing the values from Spark gives wrong values:

scala> val df = spark.sqlContext.read.format("org.apache.phoenix.spark").option("table","AEIDEV.ARRAY_TABLE").option("zkUrl","ithdp1101.cern.ch:2181").load
 df: org.apache.spark.sql.DataFrame = [ID: bigint, COL1: array<int>]

scala> df.show
 +-----+------------------+
|ID|COL1|

+-----+------------------+
|1|[-647200678, -234...|
|2|[524288, 655369, ...|
|3|[80871547, 214743...|

+-----+------------------+

scala> df.collect
 res3: Array[org.apache.spark.sql.Row] = Array([1,WrappedArray(-647200678, -234, 0, 0)], [2,WrappedArray(524288, 655369, 0, 0)], [3,WrappedArray(80871547, 2147430457, 0, 0)])

 
 * We have identified the problem in the SparkSchemaUtil class, and applied the tiny patch included in the report. After this, the data type is correctly inferred and results are correct:

scala> val df = spark.sqlContext.read.format("org.apache.phoenix.spark").option("table","AEIDEV.ARRAY_TABLE").option("zkUrl","ithdp1101.cern.ch:2181").load
 df: org.apache.spark.sql.DataFrame = [ID: bigint, COL1: array<smallint>]

scala> df.show
 +-----+------------------+
|ID|COL1|

+-----+------------------+
|1|[-32678, -9876, -...|
|2|[0, 8, 9, 10]|
|3|[123, 1234, 12345...|

+-----+------------------+

scala> df.collect
 res1: Array[org.apache.spark.sql.Row] = Array([1,WrappedArray(-32678, -9876, -234, -1)], [2,WrappedArray(0, 8, 9, 10)], [3,WrappedArray(123, 1234, 12345, 32767)])

 

We can provide more information and submit a merge request if needed.

 

 

 


> spark connector access to SmallintArray / UnsignedSmallintArray columns
> -----------------------------------------------------------------------
>
>                 Key: PHOENIX-6559
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6559
>             Project: Phoenix
>          Issue Type: Bug
>          Components: connectors, spark-connector
>    Affects Versions: connectors-6.0.0
>            Reporter: Alvaro Fernandez
>            Priority: Major
>         Attachments: SparkSchemaUtil.patch
>
>
> We have some tables defined with SMALLINT array[] columns, that are not accessible correctly with the spark connector.
> Seems that the Spark data type is incorrectly inferred by the connector as an array of integers ArrayType(IntegerType), instead of ArrayType(ShortType).
>  A table example:
> {code:java}
> CREATE TABLE IF NOT EXISTS AEIDEV.ARRAY_TABLE (ID BIGINT NOT NULL PRIMARY KEY, COL1 SMALLINT ARRAY[] );
>  UPSERT INTO AEIDEV.ARRAY_TABLE VALUES (1, ARRAY[-32678,-9876,-234,-1]);
>  UPSERT INTO AEIDEV.ARRAY_TABLE VALUES (2, ARRAY[0,8,9,10]);
>  UPSERT INTO AEIDEV.ARRAY_TABLE VALUES (3, ARRAY[123,1234,12345,32767]);{code}
>  Accessing the values from Spark gives wrong values:
>  
> {code:java}
> scala> val df = spark.sqlContext.read.format("org.apache.phoenix.spark").option("table","AEIDEV.ARRAY_TABLE").option("zkUrl","ithdp1101.cern.ch:2181").load
>  df: org.apache.spark.sql.DataFrame = [ID: bigint, COL1: array<int>]
> scala> df.show
>  ---------------------+
> ID COL1
> ---------------------+
> 1 [-647200678, -234...   2 [524288, 655369, ...   3 [80871547, 214743...
> ---------------------+
> scala> df.collect
>  res3: Array[org.apache.spark.sql.Row] = Array([1,WrappedArray(-647200678, -234, 0, 0)], [2,WrappedArray(524288, 655369, 0, 0)], [3,WrappedArray(80871547, 2147430457, 0, 0)])
> {code}
> We have identified the problem in the SparkSchemaUtil class, and applied the tiny patch included in the report. After this, the data type is correctly inferred and results are correct:
>  
> {code:java}
> scala> val df = spark.sqlContext.read.format("org.apache.phoenix.spark").option("table","AEIDEV.ARRAY_TABLE").option("zkUrl","ithdp1101.cern.ch:2181").load
>  df: org.apache.spark.sql.DataFrame = [ID: bigint, COL1: array<smallint>]
> scala> df.show
>  ---------------------+
> ID COL1
> ---------------------+
> 1 [-32678, -9876, -...   2 [0, 8, 9, 10]   3 [123, 1234, 12345...
> ---------------------+
> scala> df.collect
>  res1: Array[org.apache.spark.sql.Row] = Array([1,WrappedArray(-32678, -9876, -234, -1)], [2,WrappedArray(0, 8, 9, 10)], [3,WrappedArray(123, 1234, 12345, 32767)])
> {code}
>  
>  
> We can provide more information and submit a merge request if needed.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)