You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/04/28 15:43:05 UTC
[iceberg] branch master updated: Delta: Fix snapshotDataFilesCount and use Immutable for result (#7454)
This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 5f156d73bd Delta: Fix snapshotDataFilesCount and use Immutable for result (#7454)
5f156d73bd is described below
commit 5f156d73bd66a2727e83f0f690a651bb7275f71e
Author: Jonas(Rushan) Jiang <ru...@andrew.cmu.edu>
AuthorDate: Fri Apr 28 11:42:58 2023 -0400
Delta: Fix snapshotDataFilesCount and use Immutable for result (#7454)
---
build.gradle | 2 +
.../iceberg/delta/TestSnapshotDeltaLakeTable.java | 62 +++++++++++++++-------
.../delta/BaseSnapshotDeltaLakeTableAction.java | 34 ++++++------
.../BaseSnapshotDeltaLakeTableActionResult.java | 33 ------------
.../iceberg/delta/SnapshotDeltaLakeTable.java | 3 ++
5 files changed, 68 insertions(+), 66 deletions(-)
diff --git a/build.gradle b/build.gradle
index cef0cfbe0a..23402319ee 100644
--- a/build.gradle
+++ b/build.gradle
@@ -537,6 +537,8 @@ project(':iceberg-delta-lake') {
implementation project(':iceberg-core')
implementation project(':iceberg-parquet')
implementation "com.fasterxml.jackson.core:jackson-databind"
+ annotationProcessor "org.immutables:value"
+ compileOnly "org.immutables:value"
compileOnly "io.delta:delta-standalone_${scalaVersion}"
diff --git a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
index 165ee3e894..bace582c06 100644
--- a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
+++ b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
@@ -25,6 +25,8 @@ import static org.apache.spark.sql.functions.expr;
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Operation;
import io.delta.standalone.OptimisticTransaction;
+import io.delta.standalone.VersionLog;
+import io.delta.standalone.actions.Action;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.actions.RemoveFile;
import io.delta.standalone.exceptions.DeltaConcurrentModificationException;
@@ -34,6 +36,7 @@ import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Timestamp;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -43,7 +46,6 @@ import org.apache.commons.codec.net.URLCodec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -176,7 +178,8 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
spark, newTableIdentifier, partitionedLocation)
.execute();
- checkSnapshotIntegrity(partitionedLocation, partitionedIdentifier, newTableIdentifier, result);
+ checkSnapshotIntegrity(
+ partitionedLocation, partitionedIdentifier, newTableIdentifier, result, 0);
checkTagContentAndOrder(partitionedLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, partitionedLocation);
}
@@ -197,7 +200,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
.execute();
checkSnapshotIntegrity(
- unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result);
+ unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result, 0);
checkTagContentAndOrder(unpartitionedLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation);
}
@@ -219,7 +222,8 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
.tableLocation(newIcebergTableLocation)
.execute();
- checkSnapshotIntegrity(partitionedLocation, partitionedIdentifier, newTableIdentifier, result);
+ checkSnapshotIntegrity(
+ partitionedLocation, partitionedIdentifier, newTableIdentifier, result, 0);
checkTagContentAndOrder(partitionedLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, newIcebergTableLocation);
}
@@ -251,7 +255,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
.execute();
checkSnapshotIntegrity(
- unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result);
+ unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result, 0);
checkTagContentAndOrder(unpartitionedLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation);
checkIcebergTableProperties(
@@ -285,7 +289,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
spark, newTableIdentifier, externalDataFilesTableLocation)
.execute();
checkSnapshotIntegrity(
- externalDataFilesTableLocation, externalDataFilesIdentifier, newTableIdentifier, result);
+ externalDataFilesTableLocation, externalDataFilesIdentifier, newTableIdentifier, result, 0);
checkTagContentAndOrder(externalDataFilesTableLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, externalDataFilesTableLocation);
checkDataFilePathsIntegrity(newTableIdentifier, externalDataFilesTableLocation);
@@ -301,15 +305,12 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
SnapshotDeltaLakeTable.Result result =
DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable(
spark, newTableIdentifier, typeTestTableLocation)
- .tableProperty(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false")
.execute();
- checkSnapshotIntegrity(typeTestTableLocation, typeTestIdentifier, newTableIdentifier, result);
+ checkSnapshotIntegrity(
+ typeTestTableLocation, typeTestIdentifier, newTableIdentifier, result, 0);
checkTagContentAndOrder(typeTestTableLocation, newTableIdentifier, 0);
checkIcebergTableLocation(newTableIdentifier, typeTestTableLocation);
- checkIcebergTableProperties(
- newTableIdentifier,
- ImmutableMap.of(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false"),
- typeTestTableLocation);
+ checkIcebergTableProperties(newTableIdentifier, ImmutableMap.of(), typeTestTableLocation);
}
@Test
@@ -342,9 +343,9 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
spark, newTableIdentifier, vacuumTestTableLocation)
.execute();
checkSnapshotIntegrity(
- vacuumTestTableLocation, vacuumTestIdentifier, newTableIdentifier, result);
- checkIcebergTableLocation(newTableIdentifier, vacuumTestTableLocation);
+ vacuumTestTableLocation, vacuumTestIdentifier, newTableIdentifier, result, 13);
checkTagContentAndOrder(vacuumTestTableLocation, newTableIdentifier, 13);
+ checkIcebergTableLocation(newTableIdentifier, vacuumTestTableLocation);
}
@Test
@@ -376,7 +377,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
spark, newTableIdentifier, logCleanTestTableLocation)
.execute();
checkSnapshotIntegrity(
- logCleanTestTableLocation, logCleanTestIdentifier, newTableIdentifier, result);
+ logCleanTestTableLocation, logCleanTestIdentifier, newTableIdentifier, result, 10);
checkTagContentAndOrder(logCleanTestTableLocation, newTableIdentifier, 10);
checkIcebergTableLocation(newTableIdentifier, logCleanTestTableLocation);
}
@@ -385,7 +386,8 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
String deltaTableLocation,
String deltaTableIdentifier,
String icebergTableIdentifier,
- SnapshotDeltaLakeTable.Result snapshotReport) {
+ SnapshotDeltaLakeTable.Result snapshotReport,
+ long firstConstructableVersion) {
DeltaLog deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), deltaTableLocation);
List<Row> deltaTableContents =
@@ -394,8 +396,8 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
spark.sql("SELECT * FROM " + icebergTableIdentifier).collectAsList();
Assertions.assertThat(deltaTableContents).hasSize(icebergTableContents.size());
- Assertions.assertThat(deltaLog.update().getAllFiles())
- .hasSize((int) snapshotReport.snapshotDataFilesCount());
+ Assertions.assertThat(snapshotReport.snapshotDataFilesCount())
+ .isEqualTo(countDataFilesInDeltaLakeTable(deltaLog, firstConstructableVersion));
Assertions.assertThat(icebergTableContents)
.containsExactlyInAnyOrderElementsOf(deltaTableContents);
}
@@ -555,4 +557,28 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
df.write().format("delta").mode(SaveMode.Append).option("path", path).saveAsTable(identifier);
}
}
+
+ private long countDataFilesInDeltaLakeTable(DeltaLog deltaLog, long firstConstructableVersion) {
+ long dataFilesCount = 0;
+
+ List<AddFile> initialDataFiles =
+ deltaLog.getSnapshotForVersionAsOf(firstConstructableVersion).getAllFiles();
+ dataFilesCount += initialDataFiles.size();
+
+ Iterator<VersionLog> versionLogIterator =
+ deltaLog.getChanges(
+ firstConstructableVersion + 1, false // not throw exception when data loss detected
+ );
+
+ while (versionLogIterator.hasNext()) {
+ VersionLog versionLog = versionLogIterator.next();
+ List<Action> addFiles =
+ versionLog.getActions().stream()
+ .filter(action -> action instanceof AddFile)
+ .collect(Collectors.toList());
+ dataFilesCount += addFiles.size();
+ }
+
+ return dataFilesCount;
+ }
}
diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
index abc2708b21..8cc04f4c36 100644
--- a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
+++ b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
@@ -46,8 +46,6 @@ import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
@@ -63,6 +61,7 @@ import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.slf4j.Logger;
@@ -147,7 +146,7 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
}
@Override
- public Result execute() {
+ public SnapshotDeltaLakeTable.Result execute() {
Preconditions.checkArgument(
icebergCatalog != null && newTableIdentifier != null,
"Iceberg catalog and identifier cannot be null. Make sure to configure the action with a valid Iceberg catalog and identifier.");
@@ -158,6 +157,7 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
deltaLog.tableExists(),
"Delta Lake table does not exist at the given location: %s",
deltaTableLocation);
+ ImmutableSet.Builder<String> migratedDataFilesBuilder = ImmutableSet.builder();
io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update();
Schema schema = convertDeltaLakeSchema(updatedSnapshot.getMetadata().getSchema());
PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(schema, updatedSnapshot);
@@ -177,29 +177,27 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
.commit();
long constructableStartVersion =
commitInitialDeltaSnapshotToIcebergTransaction(
- updatedSnapshot.getVersion(), icebergTransaction);
+ updatedSnapshot.getVersion(), icebergTransaction, migratedDataFilesBuilder);
Iterator<VersionLog> versionLogIterator =
deltaLog.getChanges(
constructableStartVersion + 1, false // not throw exception when data loss detected
);
while (versionLogIterator.hasNext()) {
VersionLog versionLog = versionLogIterator.next();
- commitDeltaVersionLogToIcebergTransaction(versionLog, icebergTransaction);
+ commitDeltaVersionLogToIcebergTransaction(
+ versionLog, icebergTransaction, migratedDataFilesBuilder);
}
-
- Snapshot icebergSnapshot = icebergTransaction.table().currentSnapshot();
- long totalDataFiles =
- icebergSnapshot != null
- ? Long.parseLong(icebergSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP))
- : 0;
-
icebergTransaction.commitTransaction();
+
+ long totalDataFiles = migratedDataFilesBuilder.build().size();
LOG.info(
"Successfully created Iceberg table {} from Delta Lake table at {}, total data file count: {}",
newTableIdentifier,
deltaTableLocation,
totalDataFiles);
- return new BaseSnapshotDeltaLakeTableActionResult(totalDataFiles);
+ return ImmutableSnapshotDeltaLakeTable.Result.builder()
+ .snapshotDataFilesCount(totalDataFiles)
+ .build();
}
private Schema convertDeltaLakeSchema(io.delta.standalone.types.StructType deltaSchema) {
@@ -241,7 +239,9 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
* @return the initial version of the delta lake table that is successfully committed to iceberg
*/
private long commitInitialDeltaSnapshotToIcebergTransaction(
- long latestVersion, Transaction transaction) {
+ long latestVersion,
+ Transaction transaction,
+ ImmutableSet.Builder<String> migratedDataFilesBuilder) {
long constructableStartVersion = deltaStartVersion;
while (constructableStartVersion <= latestVersion) {
try {
@@ -251,6 +251,7 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
for (AddFile addFile : initDataFiles) {
DataFile dataFile = buildDataFileFromAction(addFile, transaction.table());
filesToAdd.add(dataFile);
+ migratedDataFilesBuilder.add(dataFile.path().toString());
}
// AppendFiles case
@@ -286,7 +287,9 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
* @param transaction the iceberg table transaction to commit to
*/
private void commitDeltaVersionLogToIcebergTransaction(
- VersionLog versionLog, Transaction transaction) {
+ VersionLog versionLog,
+ Transaction transaction,
+ ImmutableSet.Builder<String> migratedDataFilesBuilder) {
// Only need actions related to data change: AddFile and RemoveFile
List<Action> dataFileActions =
versionLog.getActions().stream()
@@ -305,6 +308,7 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
throw new ValidationException(
"The action %s's is unsupported", action.getClass().getSimpleName());
}
+ migratedDataFilesBuilder.add(dataFile.path().toString());
}
if (filesToAdd.size() > 0 && filesToRemove.size() > 0) {
diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableActionResult.java b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableActionResult.java
deleted file mode 100644
index 53c9b0d7fe..0000000000
--- a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableActionResult.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.delta;
-
-class BaseSnapshotDeltaLakeTableActionResult implements SnapshotDeltaLakeTable.Result {
-
- private final long snapshotDataFilesCount;
-
- BaseSnapshotDeltaLakeTableActionResult(long snapshotDataFilesCount) {
- this.snapshotDataFilesCount = snapshotDataFilesCount;
- }
-
- @Override
- public long snapshotDataFilesCount() {
- return snapshotDataFilesCount;
- }
-}
diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/SnapshotDeltaLakeTable.java b/delta-lake/src/main/java/org/apache/iceberg/delta/SnapshotDeltaLakeTable.java
index 6f4bd726a5..6c3f2d7c97 100644
--- a/delta-lake/src/main/java/org/apache/iceberg/delta/SnapshotDeltaLakeTable.java
+++ b/delta-lake/src/main/java/org/apache/iceberg/delta/SnapshotDeltaLakeTable.java
@@ -23,8 +23,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.actions.Action;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.immutables.value.Value;
/** Snapshot an existing Delta Lake table to Iceberg in place. */
+@Value.Enclosing
public interface SnapshotDeltaLakeTable
extends Action<SnapshotDeltaLakeTable, SnapshotDeltaLakeTable.Result> {
@@ -81,6 +83,7 @@ public interface SnapshotDeltaLakeTable
SnapshotDeltaLakeTable deltaLakeConfiguration(Configuration conf);
/** The action result that contains a summary of the execution. */
+ @Value.Immutable
interface Result {
/** Returns the number of migrated data files. */