You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/07/05 22:15:59 UTC

[incubator-iceberg] branch master updated: Support dynamic partition overwrite (#246)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b5c23eb  Support dynamic partition overwrite (#246)
b5c23eb is described below

commit b5c23ebe3d2e9c3585b85cdd4c5b5e453a39a3b8
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Fri Jul 5 15:15:55 2019 -0700

    Support dynamic partition overwrite (#246)
---
 .../apache/iceberg/spark/source/IcebergSource.java |  20 +---
 .../iceberg/spark/source/StreamingWriter.java      |  19 ++-
 .../org/apache/iceberg/spark/source/Writer.java    |  54 +++++++--
 .../iceberg/spark/source/TestParquetWrite.java     | 133 +++++++++++++++++++++
 4 files changed, 191 insertions(+), 35 deletions(-)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 545a1b8..7b7dfa2 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -21,11 +21,9 @@ package org.apache.iceberg.spark.source;
 
 import com.google.common.base.Preconditions;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -49,9 +47,6 @@ import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
 import org.apache.spark.sql.streaming.OutputMode;
 import org.apache.spark.sql.types.StructType;
 
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
-
 public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister, StreamWriteSupport {
 
   private SparkSession lazySpark = null;
@@ -74,12 +69,12 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
   @Override
   public Optional<DataSourceWriter> createWriter(String jobId, StructType dsStruct, SaveMode mode,
                                                  DataSourceOptions options) {
-    Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode);
+    Preconditions.checkArgument(mode == SaveMode.Append || mode == SaveMode.Overwrite,
+        "Save mode %s is not supported", mode);
     Configuration conf = new Configuration(lazyBaseConf());
     Table table = getTableAndResolveHadoopConfiguration(options, conf);
     validateWriteSchema(table.schema(), dsStruct);
-    FileFormat format = getFileFormat(table.properties(), options);
-    return Optional.of(new Writer(table, format));
+    return Optional.of(new Writer(table, options, mode == SaveMode.Overwrite));
   }
 
   @Override
@@ -91,11 +86,10 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
     Configuration conf = new Configuration(lazyBaseConf());
     Table table = getTableAndResolveHadoopConfiguration(options, conf);
     validateWriteSchema(table.schema(), dsStruct);
-    FileFormat format = getFileFormat(table.properties(), options);
     // Spark 2.4.x passes runId to createStreamWriter instead of real queryId,
     // so we fetch it directly from sparkContext to make writes idempotent
     String queryId = lazySparkSession().sparkContext().getLocalProperty(StreamExecution.QUERY_ID_KEY());
-    return new StreamingWriter(table, format, queryId, mode);
+    return new StreamingWriter(table, options, queryId, mode);
   }
 
   protected Table findTable(DataSourceOptions options, Configuration conf) {
@@ -145,12 +139,6 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
         .forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
   }
 
-  private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
-    Optional<String> formatOption = options.get("write-format");
-    String format = formatOption.orElse(tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT));
-    return FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
-  }
-
   private void validateWriteSchema(Schema tableSchema, StructType dsStruct) {
     Schema dsSchema = SparkSchemaUtil.convert(tableSchema, dsStruct);
     List<String> errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
index d768025..5ae30b6 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
@@ -22,12 +22,12 @@ package org.apache.iceberg.spark.source;
 import java.util.Map;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.OverwriteFiles;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
 import org.apache.spark.sql.streaming.OutputMode;
@@ -43,8 +43,8 @@ public class StreamingWriter extends Writer implements StreamWriter {
   private final String queryId;
   private final OutputMode mode;
 
-  StreamingWriter(Table table, FileFormat format, String queryId, OutputMode mode) {
-    super(table, format);
+  StreamingWriter(Table table, DataSourceOptions options, String queryId, OutputMode mode) {
+    super(table, options, false);
     this.queryId = queryId;
     this.mode = mode;
   }
@@ -68,8 +68,7 @@ public class StreamingWriter extends Writer implements StreamWriter {
         overwriteFiles.addFile(file);
         numFiles++;
       }
-      LOG.info("Overwriting files in {} with {} new files", table(), numFiles);
-      commit(overwriteFiles, epochId);
+      commit(overwriteFiles, epochId, numFiles, "streaming complete overwrite");
     } else {
       AppendFiles append = table().newFastAppend();
       int numFiles = 0;
@@ -77,18 +76,14 @@ public class StreamingWriter extends Writer implements StreamWriter {
         append.appendFile(file);
         numFiles++;
       }
-      LOG.info("Appending {} files to {}", numFiles, table());
-      commit(append, epochId);
+      commit(append, epochId, numFiles, "streaming append");
     }
   }
 
-  private <T> void commit(SnapshotUpdate<T> snapshotUpdate, long epochId) {
+  private <T> void commit(SnapshotUpdate<T> snapshotUpdate, long epochId, int numFiles, String description) {
     snapshotUpdate.set(QUERY_ID_PROPERTY, queryId);
     snapshotUpdate.set(EPOCH_ID_PROPERTY, Long.toString(epochId));
-    long start = System.currentTimeMillis();
-    snapshotUpdate.commit();
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    commitOperation(snapshotUpdate, numFiles, description);
   }
 
   @Override
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 6634e12..c149c23 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -28,7 +28,9 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.Function;
@@ -38,6 +40,8 @@ import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PendingUpdate;
+import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.avro.Avro;
@@ -53,6 +57,7 @@ import org.apache.iceberg.spark.data.SparkAvroWriter;
 import org.apache.iceberg.spark.data.SparkParquetWriters;
 import org.apache.iceberg.util.Tasks;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
@@ -68,6 +73,8 @@ import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
 import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
 import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
 import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
 
 // TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage
 class Writer implements DataSourceWriter {
@@ -77,12 +84,21 @@ class Writer implements DataSourceWriter {
   private final FileFormat format;
   private final FileIO fileIo;
   private final EncryptionManager encryptionManager;
+  private final boolean replacePartitions;
 
-  Writer(Table table, FileFormat format) {
+  Writer(Table table, DataSourceOptions options, boolean replacePartitions) {
     this.table = table;
-    this.format = format;
+    this.format = getFileFormat(table.properties(), options);
     this.fileIo = table.io();
     this.encryptionManager = table.encryption();
+    this.replacePartitions = replacePartitions;
+  }
+
+  private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
+    Optional<String> formatOption = options.get("write-format");
+    String formatString = formatOption
+        .orElse(tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT));
+    return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
   }
 
   @Override
@@ -93,6 +109,22 @@ class Writer implements DataSourceWriter {
 
   @Override
   public void commit(WriterCommitMessage[] messages) {
+    if (replacePartitions) {
+      replacePartitions(messages);
+    } else {
+      append(messages);
+    }
+  }
+
+  protected void commitOperation(PendingUpdate<?> operation, int numFiles, String description) {
+    LOG.info("Committing {} with {} files to table {}", description, numFiles, table);
+    long start = System.currentTimeMillis();
+    operation.commit(); // abort is automatically called if this fails
+    long duration = System.currentTimeMillis() - start;
+    LOG.info("Committed in {} ms", duration);
+  }
+
+  private void append(WriterCommitMessage[] messages) {
     AppendFiles append = table.newAppend();
 
     int numFiles = 0;
@@ -101,11 +133,19 @@ class Writer implements DataSourceWriter {
       append.appendFile(file);
     }
 
-    LOG.info("Appending {} files to {}", numFiles, table);
-    long start = System.currentTimeMillis();
-    append.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    commitOperation(append, numFiles, "append");
+  }
+
+  private void replacePartitions(WriterCommitMessage[] messages) {
+    ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
+
+    int numFiles = 0;
+    for (DataFile file : files(messages)) {
+      numFiles += 1;
+      dynamicOverwrite.addFile(file);
+    }
+
+    commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite");
   }
 
   @Override
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
index 1c586af..d8d164b 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
@@ -114,4 +114,137 @@ public class TestParquetWrite {
       }
     }
   }
+
+  @Test
+  public void testAppend() throws IOException {
+    File parent = temp.newFolder("parquet");
+    File location = new File(parent, "test");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c"),
+        new SimpleRecord(4, "a"),
+        new SimpleRecord(5, "b"),
+        new SimpleRecord(6, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+    df.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(location.toString());
+
+    df.withColumn("id", df.col("id").plus(3)).select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(location.toString());
+
+    table.refresh();
+
+    Dataset<Row> result = spark.read()
+        .format("iceberg")
+        .load(location.toString());
+
+    List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+    Assert.assertEquals("Result rows should match", expected, actual);
+  }
+
+  @Test
+  public void testOverwrite() throws IOException {
+    File parent = temp.newFolder("parquet");
+    File location = new File(parent, "test");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "a"),
+        new SimpleRecord(3, "c"),
+        new SimpleRecord(4, "b"),
+        new SimpleRecord(6, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+    df.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(location.toString());
+
+    // overwrite with 2*id to replace record 2, append 4 and 6
+    df.withColumn("id", df.col("id").multiply(2)).select("id", "data").write()
+        .format("iceberg")
+        .mode("overwrite")
+        .save(location.toString());
+
+    table.refresh();
+
+    Dataset<Row> result = spark.read()
+        .format("iceberg")
+        .load(location.toString());
+
+    List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+    Assert.assertEquals("Result rows should match", expected, actual);
+  }
+
+  @Test
+  public void testUnpartitionedOverwrite() throws IOException {
+    File parent = temp.newFolder("parquet");
+    File location = new File(parent, "test");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+    df.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(location.toString());
+
+    // overwrite with the same data; should not produce two copies
+    df.select("id", "data").write()
+        .format("iceberg")
+        .mode("overwrite")
+        .save(location.toString());
+
+    table.refresh();
+
+    Dataset<Row> result = spark.read()
+        .format("iceberg")
+        .load(location.toString());
+
+    List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+    Assert.assertEquals("Result rows should match", expected, actual);
+  }
 }