You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ayrat Sadreev (Jira)" <ji...@apache.org> on 2020/08/10 11:14:00 UTC
[jira] [Created] (SPARK-32580) Issue accessing a column values
after 'explode' function
Ayrat Sadreev created SPARK-32580:
-------------------------------------
Summary: Issue accessing a column values after 'explode' function
Key: SPARK-32580
URL: https://issues.apache.org/jira/browse/SPARK-32580
Project: Spark
Issue Type: Bug
Components: Spark Core, SQL
Affects Versions: 3.0.0
Reporter: Ayrat Sadreev
An exception occurs when trying to flatten double nested arrays
The schema is
{code:none}
root
|-- data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- item_id: string (nullable = true)
| | |-- timestamp: string (nullable = true)
| | |-- values: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- sample: double (nullable = true)
{code}
The target schema is
{code:none}
root
|-- item_id: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- sample: double (nullable = true)
{code}
The code (in Java)
{code:java}
package com.skf.streamer.spark;
import java.util.concurrent.TimeoutException;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class ExplodeTest {
public static void main(String[] args) throws TimeoutException {
SparkConf conf = new SparkConf()
.setAppName("SimpleApp")
.set("spark.scheduler.mode", "FAIR")
.set("spark.master", "local[1]")
.set("spark.sql.streaming.checkpointLocation", "checkpoint");
SparkSession spark = SparkSession.builder()
.config(conf)
.getOrCreate();
Dataset<Row> d0 = spark
.read()
.format("json")
.option("multiLine", "true")
.schema(getSchema())
.load("src/test/resources/explode/data.json");
d0.printSchema();
d0 = d0.withColumn("item", functions.explode(d0.col("data")));
d0 = d0.withColumn("value", functions.explode(d0.col("item.values")));
d0.printSchema();
d0 = d0.select(
d0.col("item.item_id"),
d0.col("item.timestamp"),
d0.col("value.sample")
);
d0.printSchema();
d0.show(); // Failes
spark.stop();
}
private static StructType getSchema() {
StructField[] level2Fields = {
DataTypes.createStructField("sample", DataTypes.DoubleType, false),
};
StructField[] level1Fields = {
DataTypes.createStructField("item_id", DataTypes.StringType, false),
DataTypes.createStructField("timestamp", DataTypes.StringType, false),
DataTypes.createStructField("values", DataTypes.createArrayType(DataTypes.createStructType(level2Fields)), false)
};
StructField[] fields = {
DataTypes.createStructField("data", DataTypes.createArrayType(DataTypes.createStructType(level1Fields)), false)
};
return DataTypes.createStructType(fields);
}
}
{code}
The data file
{code:json}
{
"data": [
{
"item_id": "item_1",
"timestamp": "2020-07-01 12:34:89",
"values": [
{
"sample": 1.1
},
{
"sample": 1.2
}
]
},
{
"item_id": "item_2",
"timestamp": "2020-07-02 12:34:89",
"values": [
{
"sample": 2.2
}
]
}
]
}
{code}
Dataset.show() method fails with an exception
{code:none}
Caused by: java.lang.RuntimeException: Couldn't find _gen_alias_30#30 in [_gen_alias_28#28,_gen_alias_29#29]
at scala.sys.package$.error(package.scala:30)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.$anonfun$applyOrElse$1(BoundAttribute.scala:81)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 37 more
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org