You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "luca1x (via GitHub)" <gi...@apache.org> on 2024/04/19 15:15:15 UTC

[I] Iceberg Hidden Partitioning and Spark SQL Wide Transformation Optimization [iceberg]

luca1x opened a new issue, #10187:
URL: https://github.com/apache/iceberg/issues/10187

   ### Query engine
   
   Spark 3.5
   
   ### Question
   
   Hi,
   
   We are trying out Apache Iceberg on Glue/S3 in conjunction with Apache Spark. We are using version 3.5
   As a small POC, we created an Iceberg table where we store some events in parquet format:
   
   ```
   val partitionSpec = PartitionSpec.builderFor(schema)
       .identity("date")
       .bucket("user_uid", 2048)
       .build()
   
   val tableProperties: Map[String, String] = Map(
       "format-version" -> "2",
       "history.expire.max-snapshot-age-ms" -> (7.days).toMillis.toString,
     )
   
   val table = glueCatalog.createTable(
         TableIdentifier.of(Namespace.of("db"), "events"),
         eventSchemaDefinedWithIcebergDsl,
         partitionSpec,
         tableProperties.asJava
       )
   table.replaceSortOrder().asc("user_uid").commit()
   
   ```
   
   The idea here is to store all events partitioned by date and subpartitioned (hash) by UID. 
   We populate this table with some event data using a dedicated job. So far, so good.
   Our goal is now to aggregate these events using Spark and dumping them into a second Iceberg table.
   This second aggregate table has the same properties as the events table, in particular the 
   partitioning scheme definition is exactly the same as above!
   
   We then try to aggregate some of the events using this simple Spark SQL query.
   ```
   
   // Event is a case class defined by us that maps to Iceberg input state
   // analogously, State is a case class that maps to Iceberg output state
   spark.read.format("iceberg")
      .load("events").as[Event] 
      .where(col("date").between(startDate, endDate)) // this prunes dates correctly!
      .groupBy(col("user_uid"))
      .as[String, Event] 
      .mapValues(toInitialState(_)) // toInitialState maps from Event -> State
      .reduceGroups((s1, s2) => updateState(s1, s2)) // updateState merges two State objects
      .map{case (k,s) => s}
      .writeTo("states") 
      .append() 
         
   ```
   Our hope here was that Spark would recognize that the `groupBy` does not induce a shuffle, since 
   input data is already partitioned by `user_uid` by Iceberg, provided that Spark picks up this partitioning. 
   However, this was not the case, and Spark issued a reshuffle both for the groupBy and the final write into the result table.
   
   We tried to nudge Spark into the right direction by playing around with various parameters, e.g. 
`user_uid` partition size, max input file size, trying out variations of the above query or by instructing/forcing Spark to use the uid partitioning scheme:
   
   ```
   spark.read.format("iceberg")
      .load("events").as[Event] 
      .where(col("date").between(startDate, endDate)) // let's assume we're loading 3 days of events here
      .repartition(2048, col("user_uid"))
      .sortWithinPartitions()
   
   ```
   or via 
   `repartition(bucket(2048, col("user_uid")))`, which did not work with the error `org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot generate code for expression: bucket(2048, input[4, string, false])`.
   
   Our input data for this task was about 35GB of data per day, which would result in 35 MB of data per input partition for 2048 partitions and 2 days of events, which also seems reasonable. No matter what we do, Spark does not assign a correct partitioner by `user_uid` when reading the data, as our experiments confirmed multiple times.
   
   My question:
   Is fusing stages based on partitioned input data like in the above scenario something Spark and Iceberg are expected to be capable of? What about Spark choosing an appropriate partitioner based on hidden partitioning? If so, we would like to find out what should improve in our setup. 
   
   We hope that resolving this results in a speedup of the query, which is important for us when adopting Iceberg. We are running these kinds of tasks as batch jobs and we would need to be as efficient as possible.
   
   Thanks a lot for your help, I can also provide you with more info if needed.
   
   Cheers, Luca


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org