You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/04/28 15:43:05 UTC

[iceberg] branch master updated: Delta: Fix snapshotDataFilesCount and use Immutable for result (#7454)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f156d73bd Delta: Fix snapshotDataFilesCount and use Immutable for result (#7454)
5f156d73bd is described below

commit 5f156d73bd66a2727e83f0f690a651bb7275f71e
Author: Jonas(Rushan) Jiang <ru...@andrew.cmu.edu>
AuthorDate: Fri Apr 28 11:42:58 2023 -0400

    Delta: Fix snapshotDataFilesCount and use Immutable for result (#7454)
---
 build.gradle                                       |  2 +
 .../iceberg/delta/TestSnapshotDeltaLakeTable.java  | 62 +++++++++++++++-------
 .../delta/BaseSnapshotDeltaLakeTableAction.java    | 34 ++++++------
 .../BaseSnapshotDeltaLakeTableActionResult.java    | 33 ------------
 .../iceberg/delta/SnapshotDeltaLakeTable.java      |  3 ++
 5 files changed, 68 insertions(+), 66 deletions(-)

diff --git a/build.gradle b/build.gradle
index cef0cfbe0a..23402319ee 100644
--- a/build.gradle
+++ b/build.gradle
@@ -537,6 +537,8 @@ project(':iceberg-delta-lake') {
     implementation project(':iceberg-core')
     implementation project(':iceberg-parquet')
     implementation "com.fasterxml.jackson.core:jackson-databind"
+    annotationProcessor "org.immutables:value"
+    compileOnly "org.immutables:value"
 
     compileOnly "io.delta:delta-standalone_${scalaVersion}"
 
diff --git a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
index 165ee3e894..bace582c06 100644
--- a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
+++ b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
@@ -25,6 +25,8 @@ import static org.apache.spark.sql.functions.expr;
 import io.delta.standalone.DeltaLog;
 import io.delta.standalone.Operation;
 import io.delta.standalone.OptimisticTransaction;
+import io.delta.standalone.VersionLog;
+import io.delta.standalone.actions.Action;
 import io.delta.standalone.actions.AddFile;
 import io.delta.standalone.actions.RemoveFile;
 import io.delta.standalone.exceptions.DeltaConcurrentModificationException;
@@ -34,6 +36,7 @@ import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.sql.Timestamp;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -43,7 +46,6 @@ import org.apache.commons.codec.net.URLCodec;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -176,7 +178,8 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
                 spark, newTableIdentifier, partitionedLocation)
             .execute();
 
-    checkSnapshotIntegrity(partitionedLocation, partitionedIdentifier, newTableIdentifier, result);
+    checkSnapshotIntegrity(
+        partitionedLocation, partitionedIdentifier, newTableIdentifier, result, 0);
     checkTagContentAndOrder(partitionedLocation, newTableIdentifier, 0);
     checkIcebergTableLocation(newTableIdentifier, partitionedLocation);
   }
@@ -197,7 +200,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
             .execute();
 
     checkSnapshotIntegrity(
-        unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result);
+        unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result, 0);
     checkTagContentAndOrder(unpartitionedLocation, newTableIdentifier, 0);
     checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation);
   }
@@ -219,7 +222,8 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
             .tableLocation(newIcebergTableLocation)
             .execute();
 
-    checkSnapshotIntegrity(partitionedLocation, partitionedIdentifier, newTableIdentifier, result);
+    checkSnapshotIntegrity(
+        partitionedLocation, partitionedIdentifier, newTableIdentifier, result, 0);
     checkTagContentAndOrder(partitionedLocation, newTableIdentifier, 0);
     checkIcebergTableLocation(newTableIdentifier, newIcebergTableLocation);
   }
@@ -251,7 +255,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
             .execute();
 
     checkSnapshotIntegrity(
-        unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result);
+        unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result, 0);
     checkTagContentAndOrder(unpartitionedLocation, newTableIdentifier, 0);
     checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation);
     checkIcebergTableProperties(
@@ -285,7 +289,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
                 spark, newTableIdentifier, externalDataFilesTableLocation)
             .execute();
     checkSnapshotIntegrity(
-        externalDataFilesTableLocation, externalDataFilesIdentifier, newTableIdentifier, result);
+        externalDataFilesTableLocation, externalDataFilesIdentifier, newTableIdentifier, result, 0);
     checkTagContentAndOrder(externalDataFilesTableLocation, newTableIdentifier, 0);
     checkIcebergTableLocation(newTableIdentifier, externalDataFilesTableLocation);
     checkDataFilePathsIntegrity(newTableIdentifier, externalDataFilesTableLocation);
@@ -301,15 +305,12 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
     SnapshotDeltaLakeTable.Result result =
         DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable(
                 spark, newTableIdentifier, typeTestTableLocation)
-            .tableProperty(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false")
             .execute();
-    checkSnapshotIntegrity(typeTestTableLocation, typeTestIdentifier, newTableIdentifier, result);
+    checkSnapshotIntegrity(
+        typeTestTableLocation, typeTestIdentifier, newTableIdentifier, result, 0);
     checkTagContentAndOrder(typeTestTableLocation, newTableIdentifier, 0);
     checkIcebergTableLocation(newTableIdentifier, typeTestTableLocation);
-    checkIcebergTableProperties(
-        newTableIdentifier,
-        ImmutableMap.of(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false"),
-        typeTestTableLocation);
+    checkIcebergTableProperties(newTableIdentifier, ImmutableMap.of(), typeTestTableLocation);
   }
 
   @Test
@@ -342,9 +343,9 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
                 spark, newTableIdentifier, vacuumTestTableLocation)
             .execute();
     checkSnapshotIntegrity(
-        vacuumTestTableLocation, vacuumTestIdentifier, newTableIdentifier, result);
-    checkIcebergTableLocation(newTableIdentifier, vacuumTestTableLocation);
+        vacuumTestTableLocation, vacuumTestIdentifier, newTableIdentifier, result, 13);
     checkTagContentAndOrder(vacuumTestTableLocation, newTableIdentifier, 13);
+    checkIcebergTableLocation(newTableIdentifier, vacuumTestTableLocation);
   }
 
   @Test
@@ -376,7 +377,7 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
                 spark, newTableIdentifier, logCleanTestTableLocation)
             .execute();
     checkSnapshotIntegrity(
-        logCleanTestTableLocation, logCleanTestIdentifier, newTableIdentifier, result);
+        logCleanTestTableLocation, logCleanTestIdentifier, newTableIdentifier, result, 10);
     checkTagContentAndOrder(logCleanTestTableLocation, newTableIdentifier, 10);
     checkIcebergTableLocation(newTableIdentifier, logCleanTestTableLocation);
   }
@@ -385,7 +386,8 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
       String deltaTableLocation,
       String deltaTableIdentifier,
       String icebergTableIdentifier,
-      SnapshotDeltaLakeTable.Result snapshotReport) {
+      SnapshotDeltaLakeTable.Result snapshotReport,
+      long firstConstructableVersion) {
     DeltaLog deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), deltaTableLocation);
 
     List<Row> deltaTableContents =
@@ -394,8 +396,8 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
         spark.sql("SELECT * FROM " + icebergTableIdentifier).collectAsList();
 
     Assertions.assertThat(deltaTableContents).hasSize(icebergTableContents.size());
-    Assertions.assertThat(deltaLog.update().getAllFiles())
-        .hasSize((int) snapshotReport.snapshotDataFilesCount());
+    Assertions.assertThat(snapshotReport.snapshotDataFilesCount())
+        .isEqualTo(countDataFilesInDeltaLakeTable(deltaLog, firstConstructableVersion));
     Assertions.assertThat(icebergTableContents)
         .containsExactlyInAnyOrderElementsOf(deltaTableContents);
   }
@@ -555,4 +557,28 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
       df.write().format("delta").mode(SaveMode.Append).option("path", path).saveAsTable(identifier);
     }
   }
+
+  private long countDataFilesInDeltaLakeTable(DeltaLog deltaLog, long firstConstructableVersion) {
+    long dataFilesCount = 0;
+
+    List<AddFile> initialDataFiles =
+        deltaLog.getSnapshotForVersionAsOf(firstConstructableVersion).getAllFiles();
+    dataFilesCount += initialDataFiles.size();
+
+    Iterator<VersionLog> versionLogIterator =
+        deltaLog.getChanges(
+            firstConstructableVersion + 1, false // not throw exception when data loss detected
+            );
+
+    while (versionLogIterator.hasNext()) {
+      VersionLog versionLog = versionLogIterator.next();
+      List<Action> addFiles =
+          versionLog.getActions().stream()
+              .filter(action -> action instanceof AddFile)
+              .collect(Collectors.toList());
+      dataFilesCount += addFiles.size();
+    }
+
+    return dataFilesCount;
+  }
 }
diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
index abc2708b21..8cc04f4c36 100644
--- a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
+++ b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
@@ -46,8 +46,6 @@ import org.apache.iceberg.OverwriteFiles;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.Transaction;
@@ -63,6 +61,7 @@ import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.parquet.ParquetUtil;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Type;
 import org.slf4j.Logger;
@@ -147,7 +146,7 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
   }
 
   @Override
-  public Result execute() {
+  public SnapshotDeltaLakeTable.Result execute() {
     Preconditions.checkArgument(
         icebergCatalog != null && newTableIdentifier != null,
         "Iceberg catalog and identifier cannot be null. Make sure to configure the action with a valid Iceberg catalog and identifier.");
@@ -158,6 +157,7 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
         deltaLog.tableExists(),
         "Delta Lake table does not exist at the given location: %s",
         deltaTableLocation);
+    ImmutableSet.Builder<String> migratedDataFilesBuilder = ImmutableSet.builder();
     io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update();
     Schema schema = convertDeltaLakeSchema(updatedSnapshot.getMetadata().getSchema());
     PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(schema, updatedSnapshot);
@@ -177,29 +177,27 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
         .commit();
     long constructableStartVersion =
         commitInitialDeltaSnapshotToIcebergTransaction(
-            updatedSnapshot.getVersion(), icebergTransaction);
+            updatedSnapshot.getVersion(), icebergTransaction, migratedDataFilesBuilder);
     Iterator<VersionLog> versionLogIterator =
         deltaLog.getChanges(
             constructableStartVersion + 1, false // not throw exception when data loss detected
             );
     while (versionLogIterator.hasNext()) {
       VersionLog versionLog = versionLogIterator.next();
-      commitDeltaVersionLogToIcebergTransaction(versionLog, icebergTransaction);
+      commitDeltaVersionLogToIcebergTransaction(
+          versionLog, icebergTransaction, migratedDataFilesBuilder);
     }
-
-    Snapshot icebergSnapshot = icebergTransaction.table().currentSnapshot();
-    long totalDataFiles =
-        icebergSnapshot != null
-            ? Long.parseLong(icebergSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP))
-            : 0;
-
     icebergTransaction.commitTransaction();
+
+    long totalDataFiles = migratedDataFilesBuilder.build().size();
     LOG.info(
         "Successfully created Iceberg table {} from Delta Lake table at {}, total data file count: {}",
         newTableIdentifier,
         deltaTableLocation,
         totalDataFiles);
-    return new BaseSnapshotDeltaLakeTableActionResult(totalDataFiles);
+    return ImmutableSnapshotDeltaLakeTable.Result.builder()
+        .snapshotDataFilesCount(totalDataFiles)
+        .build();
   }
 
   private Schema convertDeltaLakeSchema(io.delta.standalone.types.StructType deltaSchema) {
@@ -241,7 +239,9 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
    * @return the initial version of the delta lake table that is successfully committed to iceberg
    */
   private long commitInitialDeltaSnapshotToIcebergTransaction(
-      long latestVersion, Transaction transaction) {
+      long latestVersion,
+      Transaction transaction,
+      ImmutableSet.Builder<String> migratedDataFilesBuilder) {
     long constructableStartVersion = deltaStartVersion;
     while (constructableStartVersion <= latestVersion) {
       try {
@@ -251,6 +251,7 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
         for (AddFile addFile : initDataFiles) {
           DataFile dataFile = buildDataFileFromAction(addFile, transaction.table());
           filesToAdd.add(dataFile);
+          migratedDataFilesBuilder.add(dataFile.path().toString());
         }
 
         // AppendFiles case
@@ -286,7 +287,9 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
    * @param transaction the iceberg table transaction to commit to
    */
   private void commitDeltaVersionLogToIcebergTransaction(
-      VersionLog versionLog, Transaction transaction) {
+      VersionLog versionLog,
+      Transaction transaction,
+      ImmutableSet.Builder<String> migratedDataFilesBuilder) {
     // Only need actions related to data change: AddFile and RemoveFile
     List<Action> dataFileActions =
         versionLog.getActions().stream()
@@ -305,6 +308,7 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
         throw new ValidationException(
             "The action %s's is unsupported", action.getClass().getSimpleName());
       }
+      migratedDataFilesBuilder.add(dataFile.path().toString());
     }
 
     if (filesToAdd.size() > 0 && filesToRemove.size() > 0) {
diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableActionResult.java b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableActionResult.java
deleted file mode 100644
index 53c9b0d7fe..0000000000
--- a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableActionResult.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.delta;
-
-class BaseSnapshotDeltaLakeTableActionResult implements SnapshotDeltaLakeTable.Result {
-
-  private final long snapshotDataFilesCount;
-
-  BaseSnapshotDeltaLakeTableActionResult(long snapshotDataFilesCount) {
-    this.snapshotDataFilesCount = snapshotDataFilesCount;
-  }
-
-  @Override
-  public long snapshotDataFilesCount() {
-    return snapshotDataFilesCount;
-  }
-}
diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/SnapshotDeltaLakeTable.java b/delta-lake/src/main/java/org/apache/iceberg/delta/SnapshotDeltaLakeTable.java
index 6f4bd726a5..6c3f2d7c97 100644
--- a/delta-lake/src/main/java/org/apache/iceberg/delta/SnapshotDeltaLakeTable.java
+++ b/delta-lake/src/main/java/org/apache/iceberg/delta/SnapshotDeltaLakeTable.java
@@ -23,8 +23,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.actions.Action;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.immutables.value.Value;
 
 /** Snapshot an existing Delta Lake table to Iceberg in place. */
+@Value.Enclosing
 public interface SnapshotDeltaLakeTable
     extends Action<SnapshotDeltaLakeTable, SnapshotDeltaLakeTable.Result> {
 
@@ -81,6 +83,7 @@ public interface SnapshotDeltaLakeTable
   SnapshotDeltaLakeTable deltaLakeConfiguration(Configuration conf);
 
   /** The action result that contains a summary of the execution. */
+  @Value.Immutable
   interface Result {
 
     /** Returns the number of migrated data files. */