You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/04/28 00:33:42 UTC

[iceberg] branch master updated: Delta: Add version and timestamp tags for each Delta Lake transaction (#7450)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b1d25db291 Delta: Add version and timestamp tags for each Delta Lake transaction (#7450)
b1d25db291 is described below

commit b1d25db291c837ead3e90a0aa259ac522fbc4441
Author: Jonas(Rushan) Jiang <ru...@andrew.cmu.edu>
AuthorDate: Thu Apr 27 20:33:36 2023 -0400

    Delta: Add version and timestamp tags for each Delta Lake transaction (#7450)
---
 .../iceberg/delta/TestSnapshotDeltaLakeTable.java  | 45 ++++++++++++++++++++++
 .../delta/BaseSnapshotDeltaLakeTableAction.java    | 24 ++++++++++++
 2 files changed, 69 insertions(+)

diff --git a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
index 457e4a81ca..165ee3e894 100644
--- a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
+++ b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
@@ -33,16 +33,20 @@ import java.io.IOException;
 import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.sql.Timestamp;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.stream.Collectors;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.net.URLCodec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkCatalog;
 import org.apache.iceberg.util.LocationUtil;
@@ -173,6 +177,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
             .execute();
 
     checkSnapshotIntegrity(partitionedLocation, partitionedIdentifier, newTableIdentifier, result);
+    checkTagContentAndOrder(partitionedLocation, newTableIdentifier, 0);
     checkIcebergTableLocation(newTableIdentifier, partitionedLocation);
   }
 
@@ -193,6 +198,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
 
     checkSnapshotIntegrity(
         unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result);
+    checkTagContentAndOrder(unpartitionedLocation, newTableIdentifier, 0);
     checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation);
   }
 
@@ -214,6 +220,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
             .execute();
 
     checkSnapshotIntegrity(partitionedLocation, partitionedIdentifier, newTableIdentifier, result);
+    checkTagContentAndOrder(partitionedLocation, newTableIdentifier, 0);
     checkIcebergTableLocation(newTableIdentifier, newIcebergTableLocation);
   }
 
@@ -245,6 +252,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
 
     checkSnapshotIntegrity(
         unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result);
+    checkTagContentAndOrder(unpartitionedLocation, newTableIdentifier, 0);
     checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation);
     checkIcebergTableProperties(
         newTableIdentifier,
@@ -278,6 +286,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
             .execute();
     checkSnapshotIntegrity(
         externalDataFilesTableLocation, externalDataFilesIdentifier, newTableIdentifier, result);
+    checkTagContentAndOrder(externalDataFilesTableLocation, newTableIdentifier, 0);
     checkIcebergTableLocation(newTableIdentifier, externalDataFilesTableLocation);
     checkDataFilePathsIntegrity(newTableIdentifier, externalDataFilesTableLocation);
   }
@@ -295,6 +304,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
             .tableProperty(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false")
             .execute();
     checkSnapshotIntegrity(typeTestTableLocation, typeTestIdentifier, newTableIdentifier, result);
+    checkTagContentAndOrder(typeTestTableLocation, newTableIdentifier, 0);
     checkIcebergTableLocation(newTableIdentifier, typeTestTableLocation);
     checkIcebergTableProperties(
         newTableIdentifier,
@@ -334,6 +344,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
     checkSnapshotIntegrity(
         vacuumTestTableLocation, vacuumTestIdentifier, newTableIdentifier, result);
     checkIcebergTableLocation(newTableIdentifier, vacuumTestTableLocation);
+    checkTagContentAndOrder(vacuumTestTableLocation, newTableIdentifier, 13);
   }
 
   @Test
@@ -366,6 +377,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
             .execute();
     checkSnapshotIntegrity(
         logCleanTestTableLocation, logCleanTestIdentifier, newTableIdentifier, result);
+    checkTagContentAndOrder(logCleanTestTableLocation, newTableIdentifier, 10);
     checkIcebergTableLocation(newTableIdentifier, logCleanTestTableLocation);
   }
 
@@ -388,6 +400,39 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
         .containsExactlyInAnyOrderElementsOf(deltaTableContents);
   }
 
+  private void checkTagContentAndOrder(
+      String deltaTableLocation, String icebergTableIdentifier, long firstConstructableVersion) {
+    DeltaLog deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), deltaTableLocation);
+    long currentVersion = deltaLog.snapshot().getVersion();
+    Table icebergTable = getIcebergTable(icebergTableIdentifier);
+    Map<String, SnapshotRef> icebergSnapshotRefs = icebergTable.refs();
+    List<Snapshot> icebergSnapshots = Lists.newArrayList(icebergTable.snapshots());
+
+    Assertions.assertThat(icebergSnapshots.size())
+        .isEqualTo(currentVersion - firstConstructableVersion + 1);
+
+    for (int i = 0; i < icebergSnapshots.size(); i++) {
+      long deltaVersion = firstConstructableVersion + i;
+      Snapshot currentIcebergSnapshot = icebergSnapshots.get(i);
+
+      String expectedVersionTag = "delta-version-" + deltaVersion;
+      icebergSnapshotRefs.get(expectedVersionTag);
+      Assertions.assertThat(icebergSnapshotRefs.get(expectedVersionTag)).isNotNull();
+      Assertions.assertThat(icebergSnapshotRefs.get(expectedVersionTag).isTag()).isTrue();
+      Assertions.assertThat(icebergSnapshotRefs.get(expectedVersionTag).snapshotId())
+          .isEqualTo(currentIcebergSnapshot.snapshotId());
+
+      Timestamp deltaVersionTimestamp = deltaLog.getCommitInfoAt(deltaVersion).getTimestamp();
+      Assertions.assertThat(deltaVersionTimestamp).isNotNull();
+      String expectedTimestampTag = "delta-ts-" + deltaVersionTimestamp.getTime();
+
+      Assertions.assertThat(icebergSnapshotRefs.get(expectedTimestampTag)).isNotNull();
+      Assertions.assertThat(icebergSnapshotRefs.get(expectedTimestampTag).isTag()).isTrue();
+      Assertions.assertThat(icebergSnapshotRefs.get(expectedTimestampTag).snapshotId())
+          .isEqualTo(currentIcebergSnapshot.snapshotId());
+    }
+  }
+
   private void checkIcebergTableLocation(String icebergTableIdentifier, String expectedLocation) {
     Table icebergTable = getIcebergTable(icebergTableIdentifier);
     Assertions.assertThat(icebergTable.location())
diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
index c479fd2a3b..abc2708b21 100644
--- a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
+++ b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
@@ -26,6 +26,7 @@ import io.delta.standalone.actions.RemoveFile;
 import io.delta.standalone.exceptions.DeltaStandaloneException;
 import java.io.File;
 import java.net.URI;
+import java.sql.Timestamp;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +39,7 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManageSnapshots;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.OverwriteFiles;
@@ -79,6 +81,8 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
   private static final String DELTA_SOURCE_VALUE = "delta";
   private static final String ORIGINAL_LOCATION_PROP = "original_location";
   private static final String PARQUET_SUFFIX = ".parquet";
+  private static final String DELTA_VERSION_TAG_PREFIX = "delta-version-";
+  private static final String DELTA_TIMESTAMP_TAG_PREFIX = "delta-ts-";
   private final ImmutableMap.Builder<String, String> additionalPropertiesBuilder =
       ImmutableMap.builder();
   private DeltaLog deltaLog;
@@ -253,6 +257,7 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
         AppendFiles appendFiles = transaction.newAppend();
         filesToAdd.forEach(appendFiles::appendFile);
         appendFiles.commit();
+        tagCurrentSnapshot(constructableStartVersion, transaction);
 
         return constructableStartVersion;
       } catch (NotFoundException | IllegalArgumentException | DeltaStandaloneException e) {
@@ -318,7 +323,12 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
       DeleteFiles deleteFiles = transaction.newDelete();
       filesToRemove.forEach(deleteFiles::deleteFile);
       deleteFiles.commit();
+    } else {
+      // No data change case, dummy append to tag the snapshot
+      transaction.newAppend().commit();
     }
+
+    tagCurrentSnapshot(versionLog.getVersion(), transaction);
   }
 
   private DataFile buildDataFileFromAction(Action action, Table table) {
@@ -412,6 +422,20 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
     return additionalPropertiesBuilder.build();
   }
 
+  private void tagCurrentSnapshot(long deltaVersion, Transaction transaction) {
+    long currentSnapshotId = transaction.table().currentSnapshot().snapshotId();
+
+    ManageSnapshots manageSnapshots = transaction.manageSnapshots();
+    manageSnapshots.createTag(DELTA_VERSION_TAG_PREFIX + deltaVersion, currentSnapshotId);
+
+    Timestamp deltaVersionTimestamp = deltaLog.getCommitInfoAt(deltaVersion).getTimestamp();
+    if (deltaVersionTimestamp != null) {
+      manageSnapshots.createTag(
+          DELTA_TIMESTAMP_TAG_PREFIX + deltaVersionTimestamp.getTime(), currentSnapshotId);
+    }
+    manageSnapshots.commit();
+  }
+
   /**
    * Get the full file path, the input {@code String} path can be either a relative path or an
    * absolute path of a data file in delta table