You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2024/01/18 21:43:21 UTC

(impala) branch master updated: IMPALA-12708: An UPDATE creates 2 new snapshots in Iceberg tables

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new b372f87b6 IMPALA-12708: An UPDATE creates 2 new snapshots in Iceberg tables
b372f87b6 is described below

commit b372f87b620b9d240059fb6e098f62685c14e15e
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Mon Jan 15 18:49:19 2024 +0100

    IMPALA-12708: An UPDATE creates 2 new snapshots in Iceberg tables
    
    The current implementation of UPDATE creates the delete file(s) and the
    new data file(s) for the updated row(s). These files are committed in
    one Iceberg transaction, but the transaction adds two snapshots to the
    table. The first contains the delete file(s), the second adds the new
    data file(s) of the updated row(s). Only the final snapshot (which
    holds the consistent table state) is observable by concurrent readers,
    but still, the commit history can look strange with these "phantom
    snapshots".
    
    So instead of doing a RowDelta and AppendFiles operation in a single
    transaction, with this change we are doing a single RowDelta operation
    only.
    
    Another issue was that we also committed empty operations (e.g. UPDATEs
    with zero records). These created redundant snapshots in the table
    history. This patch also fixes that.
    
    Testing:
     * added e2e test that checks the table history
    
    Change-Id: I2ceb80b939c644388707b21061bf55451234dcd3
    Reviewed-on: http://gerrit.cloudera.org:8080/20903
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
---
 be/src/service/client-request-state.cc             |  54 ++++++----
 be/src/service/client-request-state.h              |   5 +
 .../impala/service/IcebergCatalogOpExecutor.java   | 118 +++++++++++++--------
 tests/query_test/test_iceberg.py                   |  17 +++
 4 files changed, 134 insertions(+), 60 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index aa6da5170..13429e7cc 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1548,26 +1548,11 @@ Status ClientRequestState::UpdateCatalog() {
         catalog_update.__set_write_id(finalize_params.write_id);
       }
       if (finalize_params.__isset.iceberg_params) {
-        const TIcebergDmlFinalizeParams& ice_finalize_params =
-            finalize_params.iceberg_params;
         TIcebergOperationParam& cat_ice_op = catalog_update.iceberg_operation;
         catalog_update.__isset.iceberg_operation = true;
-        cat_ice_op.__set_operation(ice_finalize_params.operation);
-        cat_ice_op.__set_initial_snapshot_id(
-            ice_finalize_params.initial_snapshot_id);
-        cat_ice_op.__set_spec_id(ice_finalize_params.spec_id);
-        if (ice_finalize_params.operation == TIcebergOperation::INSERT) {
-          cat_ice_op.__set_iceberg_data_files_fb(
-              dml_exec_state->CreateIcebergDataFilesVector());
-          cat_ice_op.__set_is_overwrite(finalize_params.is_overwrite);
-        } else if (ice_finalize_params.operation == TIcebergOperation::DELETE) {
-          cat_ice_op.__set_iceberg_delete_files_fb(
-              dml_exec_state->CreateIcebergDeleteFilesVector());
-        } else if (ice_finalize_params.operation == TIcebergOperation::UPDATE) {
-          cat_ice_op.__set_iceberg_data_files_fb(
-              dml_exec_state->CreateIcebergDataFilesVector());
-          cat_ice_op.__set_iceberg_delete_files_fb(
-              dml_exec_state->CreateIcebergDeleteFilesVector());
+        if (!CreateIcebergCatalogOps(finalize_params, &cat_ice_op)) {
+          // No change, no need to update catalog.
+          return Status::OK();
         }
       }
 
@@ -1617,6 +1602,39 @@ Status ClientRequestState::UpdateCatalog() {
   return Status::OK();
 }
 
+bool ClientRequestState::CreateIcebergCatalogOps(
+    const TFinalizeParams& finalize_params, TIcebergOperationParam* cat_ice_op) {
+  DCHECK(cat_ice_op != nullptr);
+  const TIcebergDmlFinalizeParams& ice_finalize_params = finalize_params.iceberg_params;
+  DmlExecState* dml_exec_state = GetCoordinator()->dml_exec_state();
+  bool update_catalog = true;
+  cat_ice_op->__set_operation(ice_finalize_params.operation);
+  cat_ice_op->__set_initial_snapshot_id(
+      ice_finalize_params.initial_snapshot_id);
+  cat_ice_op->__set_spec_id(ice_finalize_params.spec_id);
+  if (ice_finalize_params.operation == TIcebergOperation::INSERT) {
+    cat_ice_op->__set_iceberg_data_files_fb(
+        dml_exec_state->CreateIcebergDataFilesVector());
+    cat_ice_op->__set_is_overwrite(finalize_params.is_overwrite);
+    if (cat_ice_op->iceberg_data_files_fb.empty()) update_catalog = false;
+  } else if (ice_finalize_params.operation == TIcebergOperation::DELETE) {
+    cat_ice_op->__set_iceberg_delete_files_fb(
+        dml_exec_state->CreateIcebergDeleteFilesVector());
+    if (cat_ice_op->iceberg_delete_files_fb.empty()) update_catalog = false;
+  } else if (ice_finalize_params.operation == TIcebergOperation::UPDATE) {
+    cat_ice_op->__set_iceberg_data_files_fb(
+        dml_exec_state->CreateIcebergDataFilesVector());
+    cat_ice_op->__set_iceberg_delete_files_fb(
+        dml_exec_state->CreateIcebergDeleteFilesVector());
+    if (cat_ice_op->iceberg_delete_files_fb.empty()) {
+      DCHECK(cat_ice_op->iceberg_data_files_fb.empty());
+      update_catalog = false;
+    }
+  }
+  if (!update_catalog) query_events_->MarkEvent("No-op Iceberg DML statement");
+  return update_catalog;
+}
+
 void ClientRequestState::SetResultSet(const TDdlExecResponse* ddl_resp) {
   if (ddl_resp != NULL && ddl_resp->__isset.result_set) {
     result_metadata_ = ddl_resp->result_set.schema;
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index a94304872..7b618f072 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -803,6 +803,11 @@ class ClientRequestState {
   /// the transaction will be closed when this function returns.
   Status UpdateCatalog() WARN_UNUSED_RESULT;
 
+  /// Fills 'cat_ice_op' based on 'finalize_params'. Returns false if there is no
+  /// need to update the Catalog (no records changed), returns true otherwise.
+  bool CreateIcebergCatalogOps(const TFinalizeParams& finalize_params,
+      TIcebergOperationParam* cat_ice_op);
+
   /// Copies results into request_result_set_
   /// TODO: Have the FE return list<Data.TResultRow> so that this isn't necessary
   void SetResultSet(const TDdlExecResponse* ddl_resp);
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 32e0faff6..27a62dea6 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -22,10 +22,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.ExpireSnapshots;
 import org.apache.iceberg.FileMetadata;
@@ -61,6 +63,7 @@ import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams;
 import org.apache.impala.thrift.TAlterTableExecuteRollbackParams;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TIcebergCatalog;
+import org.apache.impala.thrift.TIcebergOperation;
 import org.apache.impala.thrift.TIcebergOperationParam;
 import org.apache.impala.thrift.TIcebergPartitionSpec;
 import org.apache.impala.thrift.TRollbackType;
@@ -349,41 +352,45 @@ public class IcebergCatalogOpExecutor {
     switch (icebergOp.operation) {
       case INSERT: appendFiles(feIcebergTable, txn, icebergOp); break;
       case DELETE: deleteRows(feIcebergTable, txn, icebergOp); break;
-      case UPDATE: {
-        deleteRows(feIcebergTable, txn, icebergOp);
-        appendFiles(feIcebergTable, txn, icebergOp);
-      } break;
+      case UPDATE: updateRows(feIcebergTable, txn, icebergOp); break;
       default: throw new ImpalaRuntimeException(
           "Unknown Iceberg operation: " + icebergOp.operation);
     }
   }
 
-  public static void deleteRows(FeIcebergTable feIcebergTable, Transaction txn,
+  private static void deleteRows(FeIcebergTable feIcebergTable, Transaction txn,
       TIcebergOperationParam icebergOp) throws ImpalaRuntimeException {
     org.apache.iceberg.Table nativeIcebergTable = feIcebergTable.getIcebergApiTable();
     List<ByteBuffer> deleteFilesFb = icebergOp.getIceberg_delete_files_fb();
     RowDelta rowDelta = txn.newRowDelta();
     for (ByteBuffer buf : deleteFilesFb) {
-      FbIcebergDataFile deleteFile = FbIcebergDataFile.getRootAsFbIcebergDataFile(buf);
-
-      PartitionSpec partSpec = nativeIcebergTable.specs().get(deleteFile.specId());
-      IcebergPartitionSpec impPartSpec = feIcebergTable.getPartitionSpec(
-          deleteFile.specId());
-      Metrics metrics = buildDataFileMetrics(deleteFile);
-      FileMetadata.Builder builder = FileMetadata.deleteFileBuilder(partSpec)
-          .ofPositionDeletes()
-          .withMetrics(metrics)
-          .withPath(deleteFile.path())
-          .withFormat(IcebergUtil.fbFileFormatToIcebergFileFormat(deleteFile.format()))
-          .withRecordCount(deleteFile.recordCount())
-          .withFileSizeInBytes(deleteFile.fileSizeInBytes());
-      IcebergUtil.PartitionData partitionData = IcebergUtil.partitionDataFromDataFile(
-          partSpec.partitionType(), impPartSpec, deleteFile);
-      if (partitionData != null) builder.withPartition(partitionData);
-      rowDelta.addDeletes(builder.build());
+      DeleteFile deleteFile = createDeleteFile(feIcebergTable, buf);
+      rowDelta.addDeletes(deleteFile);
     }
+    validateAndCommitRowDelta(rowDelta, icebergOp.getInitial_snapshot_id());
+  }
+
+  private static void updateRows(FeIcebergTable feIcebergTable, Transaction txn,
+      TIcebergOperationParam icebergOp) throws ImpalaRuntimeException {
+    org.apache.iceberg.Table nativeIcebergTable = feIcebergTable.getIcebergApiTable();
+    List<ByteBuffer> deleteFilesFb = icebergOp.getIceberg_delete_files_fb();
+    List<ByteBuffer> dataFilesFb = icebergOp.getIceberg_data_files_fb();
+    RowDelta rowDelta = txn.newRowDelta();
+    for (ByteBuffer buf : deleteFilesFb) {
+      DeleteFile deleteFile = createDeleteFile(feIcebergTable, buf);
+      rowDelta.addDeletes(deleteFile);
+    }
+    for (ByteBuffer buf : dataFilesFb) {
+      DataFile dataFile = createDataFile(feIcebergTable, buf);
+      rowDelta.addRows(dataFile);
+    }
+    validateAndCommitRowDelta(rowDelta, icebergOp.getInitial_snapshot_id());
+  }
+
+  private static void validateAndCommitRowDelta(RowDelta rowDelta,
+      long initialSnapshotId) throws ImpalaRuntimeException {
     try {
-      rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
+      rowDelta.validateFromSnapshot(initialSnapshotId);
       rowDelta.validateNoConflictingDataFiles();
       rowDelta.commit();
     } catch (ValidationException e) {
@@ -391,6 +398,50 @@ public class IcebergCatalogOpExecutor {
     }
   }
 
+  private static DataFile createDataFile(FeIcebergTable feIcebergTable, ByteBuffer buf)
+      throws ImpalaRuntimeException {
+    FbIcebergDataFile dataFile = FbIcebergDataFile.getRootAsFbIcebergDataFile(buf);
+
+    PartitionSpec partSpec = feIcebergTable.getIcebergApiTable().specs().get(
+        dataFile.specId());
+    IcebergPartitionSpec impPartSpec =
+        feIcebergTable.getPartitionSpec(dataFile.specId());
+    Metrics metrics = buildDataFileMetrics(dataFile);
+    DataFiles.Builder builder =
+        DataFiles.builder(partSpec)
+        .withMetrics(metrics)
+        .withPath(dataFile.path())
+        .withFormat(IcebergUtil.fbFileFormatToIcebergFileFormat(dataFile.format()))
+        .withRecordCount(dataFile.recordCount())
+        .withFileSizeInBytes(dataFile.fileSizeInBytes());
+    IcebergUtil.PartitionData partitionData = IcebergUtil.partitionDataFromDataFile(
+        partSpec.partitionType(), impPartSpec, dataFile);
+    if (partitionData != null) builder.withPartition(partitionData);
+    return builder.build();
+  }
+
+  private static DeleteFile createDeleteFile(FeIcebergTable feIcebergTable,
+      ByteBuffer buf) throws ImpalaRuntimeException {
+    FbIcebergDataFile deleteFile = FbIcebergDataFile.getRootAsFbIcebergDataFile(buf);
+
+    PartitionSpec partSpec = feIcebergTable.getIcebergApiTable().specs().get(
+        deleteFile.specId());
+    IcebergPartitionSpec impPartSpec = feIcebergTable.getPartitionSpec(
+        deleteFile.specId());
+    Metrics metrics = buildDataFileMetrics(deleteFile);
+    FileMetadata.Builder builder = FileMetadata.deleteFileBuilder(partSpec)
+        .ofPositionDeletes()
+        .withMetrics(metrics)
+        .withPath(deleteFile.path())
+        .withFormat(IcebergUtil.fbFileFormatToIcebergFileFormat(deleteFile.format()))
+        .withRecordCount(deleteFile.recordCount())
+        .withFileSizeInBytes(deleteFile.fileSizeInBytes());
+    IcebergUtil.PartitionData partitionData = IcebergUtil.partitionDataFromDataFile(
+        partSpec.partitionType(), impPartSpec, deleteFile);
+    if (partitionData != null) builder.withPartition(partitionData);
+    return builder.build();
+  }
+
   /**
    * Append the newly inserted data files to the Iceberg table using the AppendFiles
    * API.
@@ -406,25 +457,8 @@ public class IcebergCatalogOpExecutor {
       batchWrite = new Append(txn);
     }
     for (ByteBuffer buf : dataFilesFb) {
-      FbIcebergDataFile dataFile = FbIcebergDataFile.getRootAsFbIcebergDataFile(buf);
-      Preconditions.checkState(dataFile.specId() == icebergOp.getSpec_id());
-      int specId = icebergOp.getSpec_id();
-
-      PartitionSpec partSpec = nativeIcebergTable.specs().get(specId);
-      IcebergPartitionSpec impPartSpec =
-          feIcebergTable.getPartitionSpec(specId);
-      Metrics metrics = buildDataFileMetrics(dataFile);
-      DataFiles.Builder builder =
-          DataFiles.builder(partSpec)
-          .withMetrics(metrics)
-          .withPath(dataFile.path())
-          .withFormat(IcebergUtil.fbFileFormatToIcebergFileFormat(dataFile.format()))
-          .withRecordCount(dataFile.recordCount())
-          .withFileSizeInBytes(dataFile.fileSizeInBytes());
-      IcebergUtil.PartitionData partitionData = IcebergUtil.partitionDataFromDataFile(
-          partSpec.partitionType(), impPartSpec, dataFile);
-      if (partitionData != null) builder.withPartition(partitionData);
-      batchWrite.addFile(builder.build());
+      DataFile dataFile = createDataFile(feIcebergTable, buf);
+      batchWrite.addFile(dataFile);
     }
     try {
       batchWrite.commit();
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index d1737df78..814f328c6 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1516,9 +1516,26 @@ class TestIcebergV2Table(IcebergTestSuite):
   def test_update_basic(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-update-basic', vector,
         unique_database)
+    self._test_update_basic_snapshots(unique_database)
     if IS_HDFS and self.should_run_for_hive(vector):
       self._update_basic_hive_tests(unique_database)
 
+  def _test_update_basic_snapshots(self, db):
+    """Verifies that the tables have the expected number of snapshots, and
+    the parent ids match the previous snapshot ids. See IMPALA-12708."""
+    def validate_snapshots(tbl, expected_snapshots):
+      tbl_name = "{}.{}".format(db, tbl)
+      snapshots = get_snapshots(self.client, tbl_name,
+          expected_result_size=expected_snapshots)
+      parent_id = None
+      for s in snapshots:
+        assert s.get_parent_id() == parent_id
+        parent_id = s.get_snapshot_id()
+
+    validate_snapshots("single_col", 3)
+    validate_snapshots("ice_alltypes", 17)
+    validate_snapshots("ice_id_partitioned", 4)
+
   def _update_basic_hive_tests(self, db):
     def get_hive_results(tbl, order_by_col):
       stmt = "SELECT * FROM {}.{} ORDER BY {}".format(db, tbl, order_by_col)