You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/04/27 11:11:24 UTC
[iceberg] branch master updated: Spark: Add builder to
SparkAppenderFactory to handle increasing number of arguments (#2499)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 172ecb5 Spark: Add builder to SparkAppenderFactory to handle increasing number of arguments (#2499)
172ecb5 is described below
commit 172ecb554f0e78dd374e65cf348f0c56841b81ee
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Tue Apr 27 13:11:12 2021 +0200
Spark: Add builder to SparkAppenderFactory to handle increasing number of arguments (#2499)
---
.../iceberg/spark/source/RowDataRewriter.java | 3 +-
.../iceberg/spark/source/SparkAppenderFactory.java | 72 ++++++++++++++++++----
.../spark/source/TestSparkAppenderFactory.java | 6 +-
.../spark/source/TestSparkMergingMetrics.java | 26 +++++++-
.../org/apache/iceberg/spark/source/Writer.java | 2 +-
.../apache/iceberg/spark/source/SparkWrite.java | 2 +-
6 files changed, 90 insertions(+), 21 deletions(-)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
index 0342c22..8e97af1 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
@@ -86,7 +86,8 @@ public class RowDataRewriter implements Serializable {
RowDataReader dataReader = new RowDataReader(task, table, schema, caseSensitive);
StructType structType = SparkSchemaUtil.convert(schema);
- SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec);
+ SparkAppenderFactory appenderFactory =
+ SparkAppenderFactory.builderFor(table, schema, structType).spec(spec).build();
OutputFileFactory fileFactory = new OutputFileFactory(table, spec, format, partitionId, taskId);
TaskWriter<InternalRow> writer;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
index b97a007..29bb4ed 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
@@ -61,19 +61,6 @@ class SparkAppenderFactory implements FileAppenderFactory<InternalRow> {
private StructType eqDeleteSparkType = null;
private StructType posDeleteSparkType = null;
- // TODO: expose a builder like SparkAppenderFactory.forTable()
- SparkAppenderFactory(Table table, Schema writeSchema, StructType dsSchema) {
- this(table.properties(), writeSchema, dsSchema, table.spec());
- }
-
- SparkAppenderFactory(Map<String, String> properties, Schema writeSchema, StructType dsSchema) {
- this(properties, writeSchema, dsSchema, PartitionSpec.unpartitioned(), null, null, null);
- }
-
- SparkAppenderFactory(Map<String, String> properties, Schema writeSchema, StructType dsSchema, PartitionSpec spec) {
- this(properties, writeSchema, dsSchema, spec, null, null, null);
- }
-
SparkAppenderFactory(Map<String, String> properties, Schema writeSchema, StructType dsSchema, PartitionSpec spec,
int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) {
this.properties = properties;
@@ -85,6 +72,65 @@ class SparkAppenderFactory implements FileAppenderFactory<InternalRow> {
this.posDeleteRowSchema = posDeleteRowSchema;
}
+ static Builder builderFor(Table table, Schema writeSchema, StructType dsSchema) {
+ return new Builder(table, writeSchema, dsSchema);
+ }
+
+ static class Builder {
+ private final Table table;
+ private final Schema writeSchema;
+ private final StructType dsSchema;
+ private PartitionSpec spec;
+ private int[] equalityFieldIds;
+ private Schema eqDeleteRowSchema;
+ private Schema posDeleteRowSchema;
+
+
+ Builder(Table table, Schema writeSchema, StructType dsSchema) {
+ this.table = table;
+ this.spec = table.spec();
+ this.writeSchema = writeSchema;
+ this.dsSchema = dsSchema;
+ }
+
+ Builder spec(PartitionSpec newSpec) {
+ this.spec = newSpec;
+ return this;
+ }
+
+ Builder equalityFieldIds(int[] newEqualityFieldIds) {
+ this.equalityFieldIds = newEqualityFieldIds;
+ return this;
+ }
+
+ Builder eqDeleteRowSchema(Schema newEqDeleteRowSchema) {
+ this.eqDeleteRowSchema = newEqDeleteRowSchema;
+ return this;
+ }
+
+ Builder posDelRowSchema(Schema newPosDelRowSchema) {
+ this.posDeleteRowSchema = newPosDelRowSchema;
+ return this;
+ }
+
+ SparkAppenderFactory build() {
+ Preconditions.checkNotNull(table, "Table must not be null");
+ Preconditions.checkNotNull(writeSchema, "Write Schema must not be null");
+ Preconditions.checkNotNull(dsSchema, "DS Schema must not be null");
+ if (equalityFieldIds != null) {
+ Preconditions.checkNotNull(eqDeleteRowSchema, "Equality Field Ids and Equality Delete Row Schema" +
+ " must be set together");
+ }
+ if (eqDeleteRowSchema != null) {
+ Preconditions.checkNotNull(equalityFieldIds, "Equality Field Ids and Equality Delete Row Schema" +
+ " must be set together");
+ }
+
+ return new SparkAppenderFactory(table.properties(), writeSchema, dsSchema, spec, equalityFieldIds,
+ eqDeleteRowSchema, posDeleteRowSchema);
+ }
+ }
+
private StructType lazyEqDeleteSparkType() {
if (eqDeleteSparkType == null) {
Preconditions.checkNotNull(eqDeleteRowSchema, "Equality delete row schema shouldn't be null");
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
index cac021d..bda5257 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
@@ -44,8 +44,10 @@ public class TestSparkAppenderFactory extends TestAppenderFactory<InternalRow> {
protected FileAppenderFactory<InternalRow> createAppenderFactory(List<Integer> equalityFieldIds,
Schema eqDeleteSchema,
Schema posDeleteRowSchema) {
- return new SparkAppenderFactory(table.properties(), table.schema(), sparkType, table.spec(),
- ArrayUtil.toIntArray(equalityFieldIds), eqDeleteSchema, posDeleteRowSchema);
+ return SparkAppenderFactory.builderFor(table, table.schema(), sparkType)
+ .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
+ .eqDeleteRowSchema(eqDeleteSchema)
+ .posDelRowSchema(posDeleteRowSchema).build();
}
@Override
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMergingMetrics.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMergingMetrics.java
index 16f25cc..be74d1c 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMergingMetrics.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMergingMetrics.java
@@ -20,9 +20,14 @@
package org.apache.iceberg.spark.source;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.BaseTable;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TestMergingMetrics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileAppender;
@@ -37,9 +42,24 @@ public class TestSparkMergingMetrics extends TestMergingMetrics<InternalRow> {
@Override
protected FileAppender<InternalRow> writeAndGetAppender(List<Record> records) throws IOException {
+ Table testTable = new BaseTable(null, "dummy") {
+ @Override
+ public Map<String, String> properties() {
+ return Collections.emptyMap();
+ }
+ @Override
+ public SortOrder sortOrder() {
+ return SortOrder.unsorted();
+ }
+ @Override
+ public PartitionSpec spec() {
+ return PartitionSpec.unpartitioned();
+ }
+ };
+
FileAppender<InternalRow> appender =
- new SparkAppenderFactory(new HashMap<>(), SCHEMA, SparkSchemaUtil.convert(SCHEMA)).newAppender(
- org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat);
+ SparkAppenderFactory.builderFor(testTable, SCHEMA, SparkSchemaUtil.convert(SCHEMA)).build()
+ .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat);
try (FileAppender<InternalRow> fileAppender = appender) {
records.stream().map(r -> new StructInternalRow(SCHEMA.asStruct()).setStruct(r)).forEach(fileAppender::add);
}
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 76455b4..d03365f 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -264,7 +264,7 @@ class Writer implements DataSourceWriter {
Table table = tableBroadcast.value();
OutputFileFactory fileFactory = new OutputFileFactory(table, format, partitionId, taskId);
- SparkAppenderFactory appenderFactory = new SparkAppenderFactory(table, writeSchema, dsSchema);
+ SparkAppenderFactory appenderFactory = SparkAppenderFactory.builderFor(table, writeSchema, dsSchema).build();
PartitionSpec spec = table.spec();
FileIO io = table.io();
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 72dd6e2..213fee4 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -533,7 +533,7 @@ class SparkWrite {
Table table = tableBroadcast.value();
OutputFileFactory fileFactory = new OutputFileFactory(table, format, partitionId, taskId);
- SparkAppenderFactory appenderFactory = new SparkAppenderFactory(table, writeSchema, dsSchema);
+ SparkAppenderFactory appenderFactory = SparkAppenderFactory.builderFor(table, writeSchema, dsSchema).build();
PartitionSpec spec = table.spec();
FileIO io = table.io();