You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/05/19 03:05:35 UTC
[iceberg] branch master updated: Spark 3.4: Minor refactoring for SparkPositionDeltaWrite (#7650)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 24490b0da3 Spark 3.4: Minor refactoring for SparkPositionDeltaWrite (#7650)
24490b0da3 is described below
commit 24490b0da3124bf28817130e64b2fe4db8ecd7fd
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu May 18 20:05:27 2023 -0700
Spark 3.4: Minor refactoring for SparkPositionDeltaWrite (#7650)
---
.../spark/source/SparkPositionDeltaWrite.java | 55 +++++++++++++---------
1 file changed, 32 insertions(+), 23 deletions(-)
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 416eb6a9ee..b5139af212 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -269,9 +269,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
extraSnapshotMetadata.forEach(operation::set);
- if (!CommitMetadata.commitProperties().isEmpty()) {
- CommitMetadata.commitProperties().forEach(operation::set);
- }
+ CommitMetadata.commitProperties().forEach(operation::set);
if (wapEnabled && wapId != null) {
// write-audit-publish is enabled for this table and job
@@ -386,10 +384,13 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
protected Map<Integer, StructProjection> buildPartitionProjections(
Types.StructType partitionType, Map<Integer, PartitionSpec> specs) {
Map<Integer, StructProjection> partitionProjections = Maps.newHashMap();
- specs.forEach(
- (specID, spec) ->
- partitionProjections.put(
- specID, StructProjection.create(partitionType, spec.partitionType())));
+
+ for (int specId : specs.keySet()) {
+ PartitionSpec spec = specs.get(specId);
+ StructProjection projection = StructProjection.create(partitionType, spec.partitionType());
+ partitionProjections.put(specId, projection);
+ }
+
return partitionProjections;
}
}
@@ -425,12 +426,10 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
this.partitionRowWrapper = initPartitionRowWrapper(partitionType);
this.partitionProjections = buildPartitionProjections(partitionType, specs);
- this.specIdOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
- this.partitionOrdinal =
- context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
- this.fileOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
- this.positionOrdinal =
- context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+ this.specIdOrdinal = context.specIdOrdinal();
+ this.partitionOrdinal = context.partitionOrdinal();
+ this.fileOrdinal = context.fileOrdinal();
+ this.positionOrdinal = context.positionOrdinal();
}
@Override
@@ -517,12 +516,10 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
this.deletePartitionRowWrapper = initPartitionRowWrapper(partitionType);
this.deletePartitionProjections = buildPartitionProjections(partitionType, specs);
- this.specIdOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
- this.partitionOrdinal =
- context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
- this.fileOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
- this.positionOrdinal =
- context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+ this.specIdOrdinal = context.specIdOrdinal();
+ this.partitionOrdinal = context.partitionOrdinal();
+ this.fileOrdinal = context.fileOrdinal();
+ this.positionOrdinal = context.positionOrdinal();
}
@Override
@@ -713,10 +710,6 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
return deleteSparkType;
}
- StructType metadataSparkType() {
- return metadataSparkType;
- }
-
FileFormat deleteFileFormat() {
return deleteFileFormat;
}
@@ -732,5 +725,21 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
String queryId() {
return queryId;
}
+
+ int specIdOrdinal() {
+ return metadataSparkType.fieldIndex(MetadataColumns.SPEC_ID.name());
+ }
+
+ int partitionOrdinal() {
+ return metadataSparkType.fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
+ }
+
+ int fileOrdinal() {
+ return deleteSparkType.fieldIndex(MetadataColumns.FILE_PATH.name());
+ }
+
+ int positionOrdinal() {
+ return deleteSparkType.fieldIndex(MetadataColumns.ROW_POSITION.name());
+ }
}
}