You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/01/21 17:58:32 UTC

[GitHub] [iceberg] RussellSpitzer commented on issue #2131: ExpireSnapshots deletes active files

RussellSpitzer commented on issue #2131:
URL: https://github.com/apache/iceberg/issues/2131#issuecomment-764830749


   Have you tried the ExpireSnapshotsAction it has a slightly different internal implementation so I think it actually will preserve the file even with the procedure. But I think your concerns are all very valid.
   
   > On Jan 21, 2021, at 11:56 AM, Scott Kruger <no...@github.com> wrote:
   > 
   > 
   > It's possible to use the API to construct snapshots in such a way that expiring snapshots (with file deletion enabled) causes active data files to be deleted. This happens with an iceberg table that's manually managed over raw parquet files written by spark (doesn't really bear going into why). The basic steps are:
   > 
   > Create a partitioned iceberg table
   > Write two partitions (p1 and p2) as raw parquet data via spark
   > Append files to iceberg table
   > IMPORTANT Commit iceberg overwrite that
   > Deletes files appended in step 3
   > Re-adds those same files
   > Expire snapshot 1 with file deletion enabled
   > Write raw parquet data to a new directory containing data for partitions p2 and p3 (note that p2 is the same partition as in step 2)
   > Commit iceberg overwrite that
   > Deletes files in snapshot 2 from partition p2
   > Adds all new files from step 6
   > Expire snapshot 2 with file deletion enabled
   > Reading the iceberg table now fails because the files from p1, which are still active files, were deleted by the snapshot expiration in step 8
   > Here's a script that shows how to reproduce:
   > 
   > // vim: set ts=2 sw=2 bs=2 et
   > 
   > // Use Vimux bindings to evaluate expressions in the shell pane. For example:
   > //
   > // * In normal mode, use \vs to evaluate the block under the cursor
   > // * In (block) visual mode, use \vs to evaluate the highlighted block
   > 
   > import java.sql.{Date,Timestamp}
   > import java.time.LocalDate
   > import org.apache.hadoop.fs._
   > import org.apache.spark.sql._
   > import org.apache.spark.sql.types._
   > import org.apache.spark.sql.functions._
   > import org.apache.spark.sql.expressions._
   > import scala.collection.JavaConversions._
   > import spark.implicits._
   > import org.apache.iceberg.hadoop.{HadoopTables,HadoopInputFile}
   > import org.apache.iceberg.spark.SparkSchemaUtil
   > import org.apache.iceberg.parquet.ParquetUtil
   > import org.apache.iceberg.{PartitionSpec,DataFiles,MetricsConfig}
   > 
   > val tableDir = "hdfs:///tmp/iceberg-table"
   > 
   > val fs = new Path(tableDir).getFileSystem(sc.hadoopConfiguration)
   > fs.delete(new Path(tableDir), true)
   > 
   > val tables = new HadoopTables(sc.hadoopConfiguration)
   > 
   > // Create simple table partitioned by day
   > val df1 = (
   >   spark
   >     .range(10L)
   >     .select(
   >       'id,
   >       concat(lit("value"), 'id) as 'value,
   >       when('id < 5L, Timestamp.valueOf("2021-01-19 00:00:00")).otherwise(Timestamp.valueOf("2021-01-20 00:00:00")) as 'ts
   >     )
   > )
   > 
   > val schema = SparkSchemaUtil.convert(df1.schema)
   > val table = tables.create(
   >   schema,
   >   PartitionSpec.builderFor(schema).day("ts").build,
   >   tableDir
   > )
   > 
   > // Get data files from a path
   > def getDataFiles(path: String) = {
   >   fs
   >     .globStatus(new Path(path, "*/*.parquet"))
   >     .map({ status => HadoopInputFile.fromStatus(status, sc.hadoopConfiguration) })
   >     .map({ inputFile =>
   >       DataFiles
   >         .builder(table.spec)
   >         .withInputFile(inputFile)
   >         .withMetrics(ParquetUtil.fileMetrics(inputFile, MetricsConfig.getDefault))
   >         .withPartitionPath(new Path(inputFile.location).getParent.getName)
   >         .build
   >     })
   >     .toSeq
   > }
   > 
   > // Write dataframe as raw parquet
   > (
   >   df1
   >     .withColumn("ts_day", date_format('ts, "yyyy-MM-dd"))
   >     .repartition(2)
   >     .sortWithinPartitions('ts)
   >     .write
   >     .partitionBy("ts_day")
   >     .mode("overwrite")
   >     .parquet(s"$tableDir/data/commit1")
   > )
   > 
   > // Append data files to iceberg table
   > val dataFiles = getDataFiles(s"$tableDir/data/commit1")
   > 
   > val append = table.newFastAppend
   > dataFiles.foreach(append.appendFile)
   > append.commit
   > table.refresh
   > 
   > // Table data appears OK
   > spark.read.format("iceberg").load(tableDir).show
   > 
   > // Issue an overwrite in which the appended datafiles are deleted, then re-added
   > val overwrite = table.newOverwrite
   > dataFiles.foreach(overwrite.deleteFile)
   > dataFiles.foreach(overwrite.addFile)
   > overwrite.commit
   > table.refresh
   > 
   > // Table data appears OK
   > spark.read.format("iceberg").load(tableDir).show
   > 
   > // Expire first snapshot (append) with file cleanup enabled
   > table.expireSnapshots.expireSnapshotId(table.snapshots.head.snapshotId).cleanExpiredFiles(true).commit
   > table.refresh
   > 
   > // Table data appears OK
   > spark.read.format("iceberg").load(tableDir).show
   > 
   > // Write new parquet data, with one new (2021-01-21) and one overwritten (2021-01-20) partition
   > (
   >   spark
   >     .range(5L, 15L)
   >     .select(
   >       'id,
   >       concat(lit("value"), 'id) as 'value,
   >       when('id < 10L, Timestamp.valueOf("2021-01-20 00:00:00")).otherwise(Timestamp.valueOf("2021-01-21 00:00:00")) as 'ts
   >     )
   >     .withColumn("ts_day", date_format('ts, "yyyy-MM-dd"))
   >     .repartition(2)
   >     .sortWithinPartitions('ts)
   >     .write
   >     .partitionBy("ts_day")
   >     .mode("overwrite")
   >     .parquet(s"$tableDir/data/commit2")
   > )
   > 
   > // Do an overwrite that deletes the old files from the overwritten partition
   > // (2021-01-20) and adds the fiels we just wrote for the overwritten and new
   > // partitions
   > //
   > val dataFiles2 = getDataFiles(s"$tableDir/data/commit2")
   > 
   > val overwrite = table.newOverwrite
   > dataFiles.filter({ file => LocalDate.ofEpochDay(file.partition.get(0, classOf[Integer]).toLong) == LocalDate.of(2021, 1, 20) }).foreach(overwrite.deleteFile)
   > dataFiles2.foreach(overwrite.addFile)
   > overwrite.commit
   > table.refresh
   > 
   > // Expire the second commit (the first overwrite)
   > table.expireSnapshots.expireSnapshotId(table.snapshots.head.snapshotId).cleanExpiredFiles(true).commit
   > table.refresh
   > 
   > // Throws an exception because the files from the original commit for the
   > // partition 2021-01-19 have been deleted, even though they were not affected
   > // by the most recent overwrite
   > spark.read.format("iceberg").load(tableDir).show
   > Clearly there's user error here (we shouldn't be deleting and re-adding the same files added in the previous snapshot), but it feels like iceberg is doing the wrong thing as well, as it deletes files that it still considers active. It feels like the right solution is either to:
   > 
   > Reject the commit in step 4 with an exception
   > Warn the user that they're trying to both add and delete the same files and silently remove the affected files from the delete list
   > Detect during the expiration that the files to be deleted are still active and prevent them from getting deleted
   > —
   > You are receiving this because you are subscribed to this thread.
   > Reply to this email directly, view it on GitHub <https://github.com/apache/iceberg/issues/2131>, or unsubscribe <https://github.com/notifications/unsubscribe-auth/AADE2YPNOH7ARTMXPY2CDTTS3BTDTANCNFSM4WNIZNDQ>.
   > 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org