You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ayrat Sadreev (Jira)" <ji...@apache.org> on 2020/08/10 11:14:00 UTC

[jira] [Created] (SPARK-32580) Issue accessing a column values after 'explode' function

Ayrat Sadreev created SPARK-32580:
-------------------------------------

             Summary: Issue accessing a column values after 'explode' function
                 Key: SPARK-32580
                 URL: https://issues.apache.org/jira/browse/SPARK-32580
             Project: Spark
          Issue Type: Bug
          Components: Spark Core, SQL
    Affects Versions: 3.0.0
            Reporter: Ayrat Sadreev


An exception occurs when trying to flatten double nested arrays

The schema is
{code:none}
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- timestamp: string (nullable = true)
 |    |    |-- values: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- sample: double (nullable = true)
{code}
The target schema is
{code:none}
root
 |-- item_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- sample: double (nullable = true)
{code}
The code (in Java)
{code:java}
package com.skf.streamer.spark;

import java.util.concurrent.TimeoutException;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class ExplodeTest {

   public static void main(String[] args) throws TimeoutException {

      SparkConf conf = new SparkConf()
         .setAppName("SimpleApp")
         .set("spark.scheduler.mode", "FAIR")
         .set("spark.master", "local[1]")
         .set("spark.sql.streaming.checkpointLocation", "checkpoint");

      SparkSession spark = SparkSession.builder()
         .config(conf)
         .getOrCreate();

      Dataset<Row> d0 = spark
         .read()
         .format("json")
         .option("multiLine", "true")
         .schema(getSchema())
         .load("src/test/resources/explode/data.json");

      d0.printSchema();

      d0 = d0.withColumn("item", functions.explode(d0.col("data")));
      d0 = d0.withColumn("value", functions.explode(d0.col("item.values")));
      d0.printSchema();
      d0 = d0.select(
         d0.col("item.item_id"),
         d0.col("item.timestamp"),
         d0.col("value.sample")
      );

      d0.printSchema();

      d0.show(); // Failes

      spark.stop();
   }

   private static StructType getSchema() {
      StructField[] level2Fields = {
         DataTypes.createStructField("sample", DataTypes.DoubleType, false),
      };

      StructField[] level1Fields = {
         DataTypes.createStructField("item_id", DataTypes.StringType, false),
         DataTypes.createStructField("timestamp", DataTypes.StringType, false),
         DataTypes.createStructField("values", DataTypes.createArrayType(DataTypes.createStructType(level2Fields)), false)
      };

      StructField[] fields = {
         DataTypes.createStructField("data", DataTypes.createArrayType(DataTypes.createStructType(level1Fields)), false)
      };

      return DataTypes.createStructType(fields);
   }
}
{code}
The data file
{code:json}
{
  "data": [
    {
      "item_id": "item_1",
      "timestamp": "2020-07-01 12:34:89",
      "values": [
        {
          "sample": 1.1
        },
        {
          "sample": 1.2
        }
      ]
    },
    {
      "item_id": "item_2",
      "timestamp": "2020-07-02 12:34:89",
      "values": [
        {
          "sample": 2.2
        }
      ]
    }
  ]
}
{code}
Dataset.show() method fails with an exception
{code:none}
Caused by: java.lang.RuntimeException: Couldn't find _gen_alias_30#30 in [_gen_alias_28#28,_gen_alias_29#29]
	at scala.sys.package$.error(package.scala:30)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.$anonfun$applyOrElse$1(BoundAttribute.scala:81)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 37 more
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org