You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Takeshi Yamamuro (Jira)" <ji...@apache.org> on 2020/08/10 12:54:00 UTC

[jira] [Updated] (SPARK-32580) Issue accessing a column values after 'explode' function

     [ https://issues.apache.org/jira/browse/SPARK-32580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Takeshi Yamamuro updated SPARK-32580:
-------------------------------------
    Priority: Major  (was: Blocker)

> Issue accessing a column values after 'explode' function
> --------------------------------------------------------
>
>                 Key: SPARK-32580
>                 URL: https://issues.apache.org/jira/browse/SPARK-32580
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 3.0.0
>            Reporter: Ayrat Sadreev
>            Priority: Major
>
> An exception occurs when trying to flatten double nested arrays
> The schema is
> {code:none}
> root
>  |-- data: array (nullable = true)
>  |    |-- element: struct (containsNull = true)
>  |    |    |-- item_id: string (nullable = true)
>  |    |    |-- timestamp: string (nullable = true)
>  |    |    |-- values: array (nullable = true)
>  |    |    |    |-- element: struct (containsNull = true)
>  |    |    |    |    |-- sample: double (nullable = true)
> {code}
> The target schema is
> {code:none}
> root
>  |-- item_id: string (nullable = true)
>  |-- timestamp: string (nullable = true)
>  |-- sample: double (nullable = true)
> {code}
> The code (in Java)
> {code:java}
> package com.skf.streamer.spark;
> import java.util.concurrent.TimeoutException;
> import org.apache.spark.SparkConf;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.functions;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructField;
> import org.apache.spark.sql.types.StructType;
> public class ExplodeTest {
>    public static void main(String[] args) throws TimeoutException {
>       SparkConf conf = new SparkConf()
>          .setAppName("SimpleApp")
>          .set("spark.scheduler.mode", "FAIR")
>          .set("spark.master", "local[1]")
>          .set("spark.sql.streaming.checkpointLocation", "checkpoint");
>       SparkSession spark = SparkSession.builder()
>          .config(conf)
>          .getOrCreate();
>       Dataset<Row> d0 = spark
>          .read()
>          .format("json")
>          .option("multiLine", "true")
>          .schema(getSchema())
>          .load("src/test/resources/explode/data.json");
>       d0.printSchema();
>       d0 = d0.withColumn("item", functions.explode(d0.col("data")));
>       d0 = d0.withColumn("value", functions.explode(d0.col("item.values")));
>       d0.printSchema();
>       d0 = d0.select(
>          d0.col("item.item_id"),
>          d0.col("item.timestamp"),
>          d0.col("value.sample")
>       );
>       d0.printSchema();
>       d0.show(); // Failes
>       spark.stop();
>    }
>    private static StructType getSchema() {
>       StructField[] level2Fields = {
>          DataTypes.createStructField("sample", DataTypes.DoubleType, false),
>       };
>       StructField[] level1Fields = {
>          DataTypes.createStructField("item_id", DataTypes.StringType, false),
>          DataTypes.createStructField("timestamp", DataTypes.StringType, false),
>          DataTypes.createStructField("values", DataTypes.createArrayType(DataTypes.createStructType(level2Fields)), false)
>       };
>       StructField[] fields = {
>          DataTypes.createStructField("data", DataTypes.createArrayType(DataTypes.createStructType(level1Fields)), false)
>       };
>       return DataTypes.createStructType(fields);
>    }
> }
> {code}
> The data file
> {code:json}
> {
>   "data": [
>     {
>       "item_id": "item_1",
>       "timestamp": "2020-07-01 12:34:89",
>       "values": [
>         {
>           "sample": 1.1
>         },
>         {
>           "sample": 1.2
>         }
>       ]
>     },
>     {
>       "item_id": "item_2",
>       "timestamp": "2020-07-02 12:34:89",
>       "values": [
>         {
>           "sample": 2.2
>         }
>       ]
>     }
>   ]
> }
> {code}
> Dataset.show() method fails with an exception
> {code:none}
> Caused by: java.lang.RuntimeException: Couldn't find _gen_alias_30#30 in [_gen_alias_28#28,_gen_alias_29#29]
> 	at scala.sys.package$.error(package.scala:30)
> 	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.$anonfun$applyOrElse$1(BoundAttribute.scala:81)
> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
> 	... 37 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org