You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/04/28 00:33:42 UTC
[iceberg] branch master updated: Delta: Add version and timestamp tags for each Delta Lake transaction (#7450)
This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b1d25db291 Delta: Add version and timestamp tags for each Delta Lake transaction (#7450)
b1d25db291 is described below
commit b1d25db291c837ead3e90a0aa259ac522fbc4441
Author: Jonas(Rushan) Jiang <ru...@andrew.cmu.edu>
AuthorDate: Thu Apr 27 20:33:36 2023 -0400
Delta: Add version and timestamp tags for each Delta Lake transaction (#7450)
---
.../iceberg/delta/TestSnapshotDeltaLakeTable.java | 45 ++++++++++++++++++++++
.../delta/BaseSnapshotDeltaLakeTableAction.java | 24 ++++++++++++
2 files changed, 69 insertions(+)
diff --git a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
index 457e4a81ca..165ee3e894 100644
--- a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
+++ b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
@@ -33,16 +33,20 @@ import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.net.URLCodec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.util.LocationUtil;
@@ -173,6 +177,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
.execute();
checkSnapshotIntegrity(partitionedLocation, partitionedIdentifier, newTableIdentifier, result);
+ checkTagContentAndOrder(partitionedLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, partitionedLocation);
}
@@ -193,6 +198,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
checkSnapshotIntegrity(
unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result);
+ checkTagContentAndOrder(unpartitionedLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation);
}
@@ -214,6 +220,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
.execute();
checkSnapshotIntegrity(partitionedLocation, partitionedIdentifier, newTableIdentifier, result);
+ checkTagContentAndOrder(partitionedLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, newIcebergTableLocation);
}
@@ -245,6 +252,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
checkSnapshotIntegrity(
unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result);
+ checkTagContentAndOrder(unpartitionedLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation);
checkIcebergTableProperties(
newTableIdentifier,
@@ -278,6 +286,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
.execute();
checkSnapshotIntegrity(
externalDataFilesTableLocation, externalDataFilesIdentifier, newTableIdentifier, result);
+ checkTagContentAndOrder(externalDataFilesTableLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, externalDataFilesTableLocation);
checkDataFilePathsIntegrity(newTableIdentifier, externalDataFilesTableLocation);
}
@@ -295,6 +304,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
.tableProperty(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false")
.execute();
checkSnapshotIntegrity(typeTestTableLocation, typeTestIdentifier, newTableIdentifier, result);
+ checkTagContentAndOrder(typeTestTableLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, typeTestTableLocation);
checkIcebergTableProperties(
newTableIdentifier,
@@ -334,6 +344,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
checkSnapshotIntegrity(
vacuumTestTableLocation, vacuumTestIdentifier, newTableIdentifier, result);
checkIcebergTableLocation(newTableIdentifier, vacuumTestTableLocation);
+ checkTagContentAndOrder(vacuumTestTableLocation, newTableIdentifier, 13);
}
@Test
@@ -366,6 +377,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
.execute();
checkSnapshotIntegrity(
logCleanTestTableLocation, logCleanTestIdentifier, newTableIdentifier, result);
+ checkTagContentAndOrder(logCleanTestTableLocation, newTableIdentifier, 10);
checkIcebergTableLocation(newTableIdentifier, logCleanTestTableLocation);
}
@@ -388,6 +400,39 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
.containsExactlyInAnyOrderElementsOf(deltaTableContents);
}
+ private void checkTagContentAndOrder(
+ String deltaTableLocation, String icebergTableIdentifier, long firstConstructableVersion) {
+ DeltaLog deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), deltaTableLocation);
+ long currentVersion = deltaLog.snapshot().getVersion();
+ Table icebergTable = getIcebergTable(icebergTableIdentifier);
+ Map<String, SnapshotRef> icebergSnapshotRefs = icebergTable.refs();
+ List<Snapshot> icebergSnapshots = Lists.newArrayList(icebergTable.snapshots());
+
+ Assertions.assertThat(icebergSnapshots.size())
+ .isEqualTo(currentVersion - firstConstructableVersion + 1);
+
+ for (int i = 0; i < icebergSnapshots.size(); i++) {
+ long deltaVersion = firstConstructableVersion + i;
+ Snapshot currentIcebergSnapshot = icebergSnapshots.get(i);
+
+ String expectedVersionTag = "delta-version-" + deltaVersion;
+ icebergSnapshotRefs.get(expectedVersionTag);
+ Assertions.assertThat(icebergSnapshotRefs.get(expectedVersionTag)).isNotNull();
+ Assertions.assertThat(icebergSnapshotRefs.get(expectedVersionTag).isTag()).isTrue();
+ Assertions.assertThat(icebergSnapshotRefs.get(expectedVersionTag).snapshotId())
+ .isEqualTo(currentIcebergSnapshot.snapshotId());
+
+ Timestamp deltaVersionTimestamp = deltaLog.getCommitInfoAt(deltaVersion).getTimestamp();
+ Assertions.assertThat(deltaVersionTimestamp).isNotNull();
+ String expectedTimestampTag = "delta-ts-" + deltaVersionTimestamp.getTime();
+
+ Assertions.assertThat(icebergSnapshotRefs.get(expectedTimestampTag)).isNotNull();
+ Assertions.assertThat(icebergSnapshotRefs.get(expectedTimestampTag).isTag()).isTrue();
+ Assertions.assertThat(icebergSnapshotRefs.get(expectedTimestampTag).snapshotId())
+ .isEqualTo(currentIcebergSnapshot.snapshotId());
+ }
+ }
+
private void checkIcebergTableLocation(String icebergTableIdentifier, String expectedLocation) {
Table icebergTable = getIcebergTable(icebergTableIdentifier);
Assertions.assertThat(icebergTable.location())
diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
index c479fd2a3b..abc2708b21 100644
--- a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
+++ b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
@@ -26,6 +26,7 @@ import io.delta.standalone.actions.RemoveFile;
import io.delta.standalone.exceptions.DeltaStandaloneException;
import java.io.File;
import java.net.URI;
+import java.sql.Timestamp;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -38,6 +39,7 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.OverwriteFiles;
@@ -79,6 +81,8 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
private static final String DELTA_SOURCE_VALUE = "delta";
private static final String ORIGINAL_LOCATION_PROP = "original_location";
private static final String PARQUET_SUFFIX = ".parquet";
+ private static final String DELTA_VERSION_TAG_PREFIX = "delta-version-";
+ private static final String DELTA_TIMESTAMP_TAG_PREFIX = "delta-ts-";
private final ImmutableMap.Builder<String, String> additionalPropertiesBuilder =
ImmutableMap.builder();
private DeltaLog deltaLog;
@@ -253,6 +257,7 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
AppendFiles appendFiles = transaction.newAppend();
filesToAdd.forEach(appendFiles::appendFile);
appendFiles.commit();
+ tagCurrentSnapshot(constructableStartVersion, transaction);
return constructableStartVersion;
} catch (NotFoundException | IllegalArgumentException | DeltaStandaloneException e) {
@@ -318,7 +323,12 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
DeleteFiles deleteFiles = transaction.newDelete();
filesToRemove.forEach(deleteFiles::deleteFile);
deleteFiles.commit();
+ } else {
+ // No data change case, dummy append to tag the snapshot
+ transaction.newAppend().commit();
}
+
+ tagCurrentSnapshot(versionLog.getVersion(), transaction);
}
private DataFile buildDataFileFromAction(Action action, Table table) {
@@ -412,6 +422,20 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
return additionalPropertiesBuilder.build();
}
+ private void tagCurrentSnapshot(long deltaVersion, Transaction transaction) {
+ long currentSnapshotId = transaction.table().currentSnapshot().snapshotId();
+
+ ManageSnapshots manageSnapshots = transaction.manageSnapshots();
+ manageSnapshots.createTag(DELTA_VERSION_TAG_PREFIX + deltaVersion, currentSnapshotId);
+
+ Timestamp deltaVersionTimestamp = deltaLog.getCommitInfoAt(deltaVersion).getTimestamp();
+ if (deltaVersionTimestamp != null) {
+ manageSnapshots.createTag(
+ DELTA_TIMESTAMP_TAG_PREFIX + deltaVersionTimestamp.getTime(), currentSnapshotId);
+ }
+ manageSnapshots.commit();
+ }
+
/**
* Get the full file path, the input {@code String} path can be either a relative path or an
* absolute path of a data file in delta table