You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/05/19 03:05:35 UTC

[iceberg] branch master updated: Spark 3.4: Minor refactoring for SparkPositionDeltaWrite (#7650)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 24490b0da3 Spark 3.4: Minor refactoring for SparkPositionDeltaWrite (#7650)
24490b0da3 is described below

commit 24490b0da3124bf28817130e64b2fe4db8ecd7fd
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu May 18 20:05:27 2023 -0700

    Spark 3.4: Minor refactoring for SparkPositionDeltaWrite (#7650)
---
 .../spark/source/SparkPositionDeltaWrite.java      | 55 +++++++++++++---------
 1 file changed, 32 insertions(+), 23 deletions(-)

diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 416eb6a9ee..b5139af212 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -269,9 +269,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
 
       extraSnapshotMetadata.forEach(operation::set);
 
-      if (!CommitMetadata.commitProperties().isEmpty()) {
-        CommitMetadata.commitProperties().forEach(operation::set);
-      }
+      CommitMetadata.commitProperties().forEach(operation::set);
 
       if (wapEnabled && wapId != null) {
         // write-audit-publish is enabled for this table and job
@@ -386,10 +384,13 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
     protected Map<Integer, StructProjection> buildPartitionProjections(
         Types.StructType partitionType, Map<Integer, PartitionSpec> specs) {
       Map<Integer, StructProjection> partitionProjections = Maps.newHashMap();
-      specs.forEach(
-          (specID, spec) ->
-              partitionProjections.put(
-                  specID, StructProjection.create(partitionType, spec.partitionType())));
+
+      for (int specId : specs.keySet()) {
+        PartitionSpec spec = specs.get(specId);
+        StructProjection projection = StructProjection.create(partitionType, spec.partitionType());
+        partitionProjections.put(specId, projection);
+      }
+
       return partitionProjections;
     }
   }
@@ -425,12 +426,10 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
       this.partitionRowWrapper = initPartitionRowWrapper(partitionType);
       this.partitionProjections = buildPartitionProjections(partitionType, specs);
 
-      this.specIdOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
-      this.partitionOrdinal =
-          context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
-      this.fileOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
-      this.positionOrdinal =
-          context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+      this.specIdOrdinal = context.specIdOrdinal();
+      this.partitionOrdinal = context.partitionOrdinal();
+      this.fileOrdinal = context.fileOrdinal();
+      this.positionOrdinal = context.positionOrdinal();
     }
 
     @Override
@@ -517,12 +516,10 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
       this.deletePartitionRowWrapper = initPartitionRowWrapper(partitionType);
       this.deletePartitionProjections = buildPartitionProjections(partitionType, specs);
 
-      this.specIdOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
-      this.partitionOrdinal =
-          context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
-      this.fileOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
-      this.positionOrdinal =
-          context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+      this.specIdOrdinal = context.specIdOrdinal();
+      this.partitionOrdinal = context.partitionOrdinal();
+      this.fileOrdinal = context.fileOrdinal();
+      this.positionOrdinal = context.positionOrdinal();
     }
 
     @Override
@@ -713,10 +710,6 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
       return deleteSparkType;
     }
 
-    StructType metadataSparkType() {
-      return metadataSparkType;
-    }
-
     FileFormat deleteFileFormat() {
       return deleteFileFormat;
     }
@@ -732,5 +725,21 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
     String queryId() {
       return queryId;
     }
+
+    int specIdOrdinal() {
+      return metadataSparkType.fieldIndex(MetadataColumns.SPEC_ID.name());
+    }
+
+    int partitionOrdinal() {
+      return metadataSparkType.fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
+    }
+
+    int fileOrdinal() {
+      return deleteSparkType.fieldIndex(MetadataColumns.FILE_PATH.name());
+    }
+
+    int positionOrdinal() {
+      return deleteSparkType.fieldIndex(MetadataColumns.ROW_POSITION.name());
+    }
   }
 }