You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Takeshi Yamamuro (Jira)" <ji...@apache.org> on 2020/08/11 12:07:00 UTC
[jira] [Resolved] (SPARK-32580) Issue accessing a column values
after 'explode' function
[ https://issues.apache.org/jira/browse/SPARK-32580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Takeshi Yamamuro resolved SPARK-32580.
--------------------------------------
Fix Version/s: 3.0.1
Resolution: Fixed
> Issue accessing a column values after 'explode' function
> --------------------------------------------------------
>
> Key: SPARK-32580
> URL: https://issues.apache.org/jira/browse/SPARK-32580
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Ayrat Sadreev
> Priority: Major
> Fix For: 3.0.1
>
> Attachments: ExplodeTest.java, data.json
>
>
> An exception occurs when trying to flatten double nested arrays
> The schema is
> {code:none}
> root
> |-- data: array (nullable = true)
> | |-- element: struct (containsNull = true)
> | | |-- item_id: string (nullable = true)
> | | |-- timestamp: string (nullable = true)
> | | |-- values: array (nullable = true)
> | | | |-- element: struct (containsNull = true)
> | | | | |-- sample: double (nullable = true)
> {code}
> The target schema is
> {code:none}
> root
> |-- item_id: string (nullable = true)
> |-- timestamp: string (nullable = true)
> |-- sample: double (nullable = true)
> {code}
> The code (in Java)
> {code:java}
> package com.skf.streamer.spark;
> import java.util.concurrent.TimeoutException;
> import org.apache.spark.SparkConf;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.functions;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructField;
> import org.apache.spark.sql.types.StructType;
> public class ExplodeTest {
> public static void main(String[] args) throws TimeoutException {
> SparkConf conf = new SparkConf()
> .setAppName("SimpleApp")
> .set("spark.scheduler.mode", "FAIR")
> .set("spark.master", "local[1]")
> .set("spark.sql.streaming.checkpointLocation", "checkpoint");
> SparkSession spark = SparkSession.builder()
> .config(conf)
> .getOrCreate();
> Dataset<Row> d0 = spark
> .read()
> .format("json")
> .option("multiLine", "true")
> .schema(getSchema())
> .load("src/test/resources/explode/data.json");
> d0.printSchema();
> d0 = d0.withColumn("item", functions.explode(d0.col("data")));
> d0 = d0.withColumn("value", functions.explode(d0.col("item.values")));
> d0.printSchema();
> d0 = d0.select(
> d0.col("item.item_id"),
> d0.col("item.timestamp"),
> d0.col("value.sample")
> );
> d0.printSchema();
> d0.show(); // Failes
> spark.stop();
> }
> private static StructType getSchema() {
> StructField[] level2Fields = {
> DataTypes.createStructField("sample", DataTypes.DoubleType, false),
> };
> StructField[] level1Fields = {
> DataTypes.createStructField("item_id", DataTypes.StringType, false),
> DataTypes.createStructField("timestamp", DataTypes.StringType, false),
> DataTypes.createStructField("values", DataTypes.createArrayType(DataTypes.createStructType(level2Fields)), false)
> };
> StructField[] fields = {
> DataTypes.createStructField("data", DataTypes.createArrayType(DataTypes.createStructType(level1Fields)), false)
> };
> return DataTypes.createStructType(fields);
> }
> }
> {code}
> The data file
> {code:json}
> {
> "data": [
> {
> "item_id": "item_1",
> "timestamp": "2020-07-01 12:34:89",
> "values": [
> {
> "sample": 1.1
> },
> {
> "sample": 1.2
> }
> ]
> },
> {
> "item_id": "item_2",
> "timestamp": "2020-07-02 12:34:89",
> "values": [
> {
> "sample": 2.2
> }
> ]
> }
> ]
> }
> {code}
> Dataset.show() method fails with an exception
> {code:none}
> Caused by: java.lang.RuntimeException: Couldn't find _gen_alias_30#30 in [_gen_alias_28#28,_gen_alias_29#29]
> at scala.sys.package$.error(package.scala:30)
> at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.$anonfun$applyOrElse$1(BoundAttribute.scala:81)
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
> ... 37 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org