You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/04/27 11:11:24 UTC

[iceberg] branch master updated: Spark: Add builder to SparkAppenderFactory to handle increasing number of arguments (#2499)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 172ecb5  Spark: Add builder to SparkAppenderFactory to handle increasing number of arguments (#2499)
172ecb5 is described below

commit 172ecb554f0e78dd374e65cf348f0c56841b81ee
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Tue Apr 27 13:11:12 2021 +0200

    Spark: Add builder to SparkAppenderFactory to handle increasing number of arguments (#2499)
---
 .../iceberg/spark/source/RowDataRewriter.java      |  3 +-
 .../iceberg/spark/source/SparkAppenderFactory.java | 72 ++++++++++++++++++----
 .../spark/source/TestSparkAppenderFactory.java     |  6 +-
 .../spark/source/TestSparkMergingMetrics.java      | 26 +++++++-
 .../org/apache/iceberg/spark/source/Writer.java    |  2 +-
 .../apache/iceberg/spark/source/SparkWrite.java    |  2 +-
 6 files changed, 90 insertions(+), 21 deletions(-)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
index 0342c22..8e97af1 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
@@ -86,7 +86,8 @@ public class RowDataRewriter implements Serializable {
     RowDataReader dataReader = new RowDataReader(task, table, schema, caseSensitive);
 
     StructType structType = SparkSchemaUtil.convert(schema);
-    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec);
+    SparkAppenderFactory appenderFactory =
+        SparkAppenderFactory.builderFor(table, schema, structType).spec(spec).build();
     OutputFileFactory fileFactory = new OutputFileFactory(table, spec, format, partitionId, taskId);
 
     TaskWriter<InternalRow> writer;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
index b97a007..29bb4ed 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
@@ -61,19 +61,6 @@ class SparkAppenderFactory implements FileAppenderFactory<InternalRow> {
   private StructType eqDeleteSparkType = null;
   private StructType posDeleteSparkType = null;
 
-  // TODO: expose a builder like SparkAppenderFactory.forTable()
-  SparkAppenderFactory(Table table, Schema writeSchema, StructType dsSchema) {
-    this(table.properties(), writeSchema, dsSchema, table.spec());
-  }
-
-  SparkAppenderFactory(Map<String, String> properties, Schema writeSchema, StructType dsSchema) {
-    this(properties, writeSchema, dsSchema, PartitionSpec.unpartitioned(), null, null, null);
-  }
-
-  SparkAppenderFactory(Map<String, String> properties, Schema writeSchema, StructType dsSchema, PartitionSpec spec) {
-    this(properties, writeSchema, dsSchema, spec, null, null, null);
-  }
-
   SparkAppenderFactory(Map<String, String> properties, Schema writeSchema, StructType dsSchema, PartitionSpec spec,
                        int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) {
     this.properties = properties;
@@ -85,6 +72,65 @@ class SparkAppenderFactory implements FileAppenderFactory<InternalRow> {
     this.posDeleteRowSchema = posDeleteRowSchema;
   }
 
+  static Builder builderFor(Table table, Schema writeSchema, StructType dsSchema) {
+    return new Builder(table, writeSchema, dsSchema);
+  }
+
+  static class Builder {
+    private final Table table;
+    private final Schema writeSchema;
+    private final StructType dsSchema;
+    private PartitionSpec spec;
+    private int[] equalityFieldIds;
+    private Schema eqDeleteRowSchema;
+    private Schema posDeleteRowSchema;
+
+
+    Builder(Table table, Schema writeSchema, StructType dsSchema) {
+      this.table = table;
+      this.spec = table.spec();
+      this.writeSchema = writeSchema;
+      this.dsSchema = dsSchema;
+    }
+
+    Builder spec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    Builder equalityFieldIds(int[] newEqualityFieldIds) {
+      this.equalityFieldIds = newEqualityFieldIds;
+      return this;
+    }
+
+    Builder eqDeleteRowSchema(Schema newEqDeleteRowSchema) {
+      this.eqDeleteRowSchema = newEqDeleteRowSchema;
+      return this;
+    }
+
+    Builder posDelRowSchema(Schema newPosDelRowSchema) {
+      this.posDeleteRowSchema = newPosDelRowSchema;
+      return this;
+    }
+
+    SparkAppenderFactory build() {
+      Preconditions.checkNotNull(table, "Table must not be null");
+      Preconditions.checkNotNull(writeSchema, "Write Schema must not be null");
+      Preconditions.checkNotNull(dsSchema, "DS Schema must not be null");
+      if (equalityFieldIds != null) {
+        Preconditions.checkNotNull(eqDeleteRowSchema, "Equality Field Ids and Equality Delete Row Schema" +
+            " must be set together");
+      }
+      if (eqDeleteRowSchema != null) {
+        Preconditions.checkNotNull(equalityFieldIds, "Equality Field Ids and Equality Delete Row Schema" +
+            " must be set together");
+      }
+
+      return new SparkAppenderFactory(table.properties(), writeSchema, dsSchema, spec, equalityFieldIds,
+          eqDeleteRowSchema, posDeleteRowSchema);
+    }
+  }
+
   private StructType lazyEqDeleteSparkType() {
     if (eqDeleteSparkType == null) {
       Preconditions.checkNotNull(eqDeleteRowSchema, "Equality delete row schema shouldn't be null");
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
index cac021d..bda5257 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
@@ -44,8 +44,10 @@ public class TestSparkAppenderFactory extends TestAppenderFactory<InternalRow> {
   protected FileAppenderFactory<InternalRow> createAppenderFactory(List<Integer> equalityFieldIds,
                                                                    Schema eqDeleteSchema,
                                                                    Schema posDeleteRowSchema) {
-    return new SparkAppenderFactory(table.properties(), table.schema(), sparkType, table.spec(),
-        ArrayUtil.toIntArray(equalityFieldIds), eqDeleteSchema, posDeleteRowSchema);
+    return SparkAppenderFactory.builderFor(table, table.schema(), sparkType)
+        .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
+        .eqDeleteRowSchema(eqDeleteSchema)
+        .posDelRowSchema(posDeleteRowSchema).build();
   }
 
   @Override
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMergingMetrics.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMergingMetrics.java
index 16f25cc..be74d1c 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMergingMetrics.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMergingMetrics.java
@@ -20,9 +20,14 @@
 package org.apache.iceberg.spark.source;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TestMergingMetrics;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.io.FileAppender;
@@ -37,9 +42,24 @@ public class TestSparkMergingMetrics extends TestMergingMetrics<InternalRow> {
 
   @Override
   protected FileAppender<InternalRow> writeAndGetAppender(List<Record> records) throws IOException {
+    Table testTable = new BaseTable(null, "dummy") {
+      @Override
+      public Map<String, String> properties() {
+        return Collections.emptyMap();
+      }
+      @Override
+      public SortOrder sortOrder() {
+        return SortOrder.unsorted();
+      }
+      @Override
+      public PartitionSpec spec() {
+        return PartitionSpec.unpartitioned();
+      }
+    };
+
     FileAppender<InternalRow> appender =
-        new SparkAppenderFactory(new HashMap<>(), SCHEMA, SparkSchemaUtil.convert(SCHEMA)).newAppender(
-            org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat);
+        SparkAppenderFactory.builderFor(testTable, SCHEMA, SparkSchemaUtil.convert(SCHEMA)).build()
+            .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat);
     try (FileAppender<InternalRow> fileAppender = appender) {
       records.stream().map(r -> new StructInternalRow(SCHEMA.asStruct()).setStruct(r)).forEach(fileAppender::add);
     }
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 76455b4..d03365f 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -264,7 +264,7 @@ class Writer implements DataSourceWriter {
       Table table = tableBroadcast.value();
 
       OutputFileFactory fileFactory = new OutputFileFactory(table, format, partitionId, taskId);
-      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(table, writeSchema, dsSchema);
+      SparkAppenderFactory appenderFactory = SparkAppenderFactory.builderFor(table, writeSchema, dsSchema).build();
 
       PartitionSpec spec = table.spec();
       FileIO io = table.io();
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 72dd6e2..213fee4 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -533,7 +533,7 @@ class SparkWrite {
       Table table = tableBroadcast.value();
 
       OutputFileFactory fileFactory = new OutputFileFactory(table, format, partitionId, taskId);
-      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(table, writeSchema, dsSchema);
+      SparkAppenderFactory appenderFactory = SparkAppenderFactory.builderFor(table, writeSchema, dsSchema).build();
 
       PartitionSpec spec = table.spec();
       FileIO io = table.io();