You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/07/05 22:15:59 UTC
[incubator-iceberg] branch master updated: Support dynamic
partition overwrite (#246)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b5c23eb Support dynamic partition overwrite (#246)
b5c23eb is described below
commit b5c23ebe3d2e9c3585b85cdd4c5b5e453a39a3b8
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Fri Jul 5 15:15:55 2019 -0700
Support dynamic partition overwrite (#246)
---
.../apache/iceberg/spark/source/IcebergSource.java | 20 +---
.../iceberg/spark/source/StreamingWriter.java | 19 ++-
.../org/apache/iceberg/spark/source/Writer.java | 54 +++++++--
.../iceberg/spark/source/TestParquetWrite.java | 133 +++++++++++++++++++++
4 files changed, 191 insertions(+), 35 deletions(-)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 545a1b8..7b7dfa2 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -21,11 +21,9 @@ package org.apache.iceberg.spark.source;
import com.google.common.base.Preconditions;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -49,9 +47,6 @@ import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
-
public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister, StreamWriteSupport {
private SparkSession lazySpark = null;
@@ -74,12 +69,12 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
@Override
public Optional<DataSourceWriter> createWriter(String jobId, StructType dsStruct, SaveMode mode,
DataSourceOptions options) {
- Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode);
+ Preconditions.checkArgument(mode == SaveMode.Append || mode == SaveMode.Overwrite,
+ "Save mode %s is not supported", mode);
Configuration conf = new Configuration(lazyBaseConf());
Table table = getTableAndResolveHadoopConfiguration(options, conf);
validateWriteSchema(table.schema(), dsStruct);
- FileFormat format = getFileFormat(table.properties(), options);
- return Optional.of(new Writer(table, format));
+ return Optional.of(new Writer(table, options, mode == SaveMode.Overwrite));
}
@Override
@@ -91,11 +86,10 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
Configuration conf = new Configuration(lazyBaseConf());
Table table = getTableAndResolveHadoopConfiguration(options, conf);
validateWriteSchema(table.schema(), dsStruct);
- FileFormat format = getFileFormat(table.properties(), options);
// Spark 2.4.x passes runId to createStreamWriter instead of real queryId,
// so we fetch it directly from sparkContext to make writes idempotent
String queryId = lazySparkSession().sparkContext().getLocalProperty(StreamExecution.QUERY_ID_KEY());
- return new StreamingWriter(table, format, queryId, mode);
+ return new StreamingWriter(table, options, queryId, mode);
}
protected Table findTable(DataSourceOptions options, Configuration conf) {
@@ -145,12 +139,6 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
.forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
}
- private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
- Optional<String> formatOption = options.get("write-format");
- String format = formatOption.orElse(tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT));
- return FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
- }
-
private void validateWriteSchema(Schema tableSchema, StructType dsStruct) {
Schema dsSchema = SparkSchemaUtil.convert(tableSchema, dsStruct);
List<String> errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
index d768025..5ae30b6 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
@@ -22,12 +22,12 @@ package org.apache.iceberg.spark.source;
import java.util.Map;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
@@ -43,8 +43,8 @@ public class StreamingWriter extends Writer implements StreamWriter {
private final String queryId;
private final OutputMode mode;
- StreamingWriter(Table table, FileFormat format, String queryId, OutputMode mode) {
- super(table, format);
+ StreamingWriter(Table table, DataSourceOptions options, String queryId, OutputMode mode) {
+ super(table, options, false);
this.queryId = queryId;
this.mode = mode;
}
@@ -68,8 +68,7 @@ public class StreamingWriter extends Writer implements StreamWriter {
overwriteFiles.addFile(file);
numFiles++;
}
- LOG.info("Overwriting files in {} with {} new files", table(), numFiles);
- commit(overwriteFiles, epochId);
+ commit(overwriteFiles, epochId, numFiles, "streaming complete overwrite");
} else {
AppendFiles append = table().newFastAppend();
int numFiles = 0;
@@ -77,18 +76,14 @@ public class StreamingWriter extends Writer implements StreamWriter {
append.appendFile(file);
numFiles++;
}
- LOG.info("Appending {} files to {}", numFiles, table());
- commit(append, epochId);
+ commit(append, epochId, numFiles, "streaming append");
}
}
- private <T> void commit(SnapshotUpdate<T> snapshotUpdate, long epochId) {
+ private <T> void commit(SnapshotUpdate<T> snapshotUpdate, long epochId, int numFiles, String description) {
snapshotUpdate.set(QUERY_ID_PROPERTY, queryId);
snapshotUpdate.set(EPOCH_ID_PROPERTY, Long.toString(epochId));
- long start = System.currentTimeMillis();
- snapshotUpdate.commit();
- long duration = System.currentTimeMillis() - start;
- LOG.info("Committed in {} ms", duration);
+ commitOperation(snapshotUpdate, numFiles, description);
}
@Override
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 6634e12..c149c23 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -28,7 +28,9 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
@@ -38,6 +40,8 @@ import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PendingUpdate;
+import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
@@ -53,6 +57,7 @@ import org.apache.iceberg.spark.data.SparkAvroWriter;
import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
@@ -68,6 +73,8 @@ import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
// TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage
class Writer implements DataSourceWriter {
@@ -77,12 +84,21 @@ class Writer implements DataSourceWriter {
private final FileFormat format;
private final FileIO fileIo;
private final EncryptionManager encryptionManager;
+ private final boolean replacePartitions;
- Writer(Table table, FileFormat format) {
+ Writer(Table table, DataSourceOptions options, boolean replacePartitions) {
this.table = table;
- this.format = format;
+ this.format = getFileFormat(table.properties(), options);
this.fileIo = table.io();
this.encryptionManager = table.encryption();
+ this.replacePartitions = replacePartitions;
+ }
+
+ private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
+ Optional<String> formatOption = options.get("write-format");
+ String formatString = formatOption
+ .orElse(tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT));
+ return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
}
@Override
@@ -93,6 +109,22 @@ class Writer implements DataSourceWriter {
@Override
public void commit(WriterCommitMessage[] messages) {
+ if (replacePartitions) {
+ replacePartitions(messages);
+ } else {
+ append(messages);
+ }
+ }
+
+ protected void commitOperation(PendingUpdate<?> operation, int numFiles, String description) {
+ LOG.info("Committing {} with {} files to table {}", description, numFiles, table);
+ long start = System.currentTimeMillis();
+ operation.commit(); // abort is automatically called if this fails
+ long duration = System.currentTimeMillis() - start;
+ LOG.info("Committed in {} ms", duration);
+ }
+
+ private void append(WriterCommitMessage[] messages) {
AppendFiles append = table.newAppend();
int numFiles = 0;
@@ -101,11 +133,19 @@ class Writer implements DataSourceWriter {
append.appendFile(file);
}
- LOG.info("Appending {} files to {}", numFiles, table);
- long start = System.currentTimeMillis();
- append.commit(); // abort is automatically called if this fails
- long duration = System.currentTimeMillis() - start;
- LOG.info("Committed in {} ms", duration);
+ commitOperation(append, numFiles, "append");
+ }
+
+ private void replacePartitions(WriterCommitMessage[] messages) {
+ ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
+
+ int numFiles = 0;
+ for (DataFile file : files(messages)) {
+ numFiles += 1;
+ dynamicOverwrite.addFile(file);
+ }
+
+ commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite");
}
@Override
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
index 1c586af..d8d164b 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
@@ -114,4 +114,137 @@ public class TestParquetWrite {
}
}
}
+
+ @Test
+ public void testAppend() throws IOException {
+ File parent = temp.newFolder("parquet");
+ File location = new File(parent, "test");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ List<SimpleRecord> expected = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c"),
+ new SimpleRecord(4, "a"),
+ new SimpleRecord(5, "b"),
+ new SimpleRecord(6, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+ df.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(location.toString());
+
+ df.withColumn("id", df.col("id").plus(3)).select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(location.toString());
+
+ table.refresh();
+
+ Dataset<Row> result = spark.read()
+ .format("iceberg")
+ .load(location.toString());
+
+ List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+ Assert.assertEquals("Result rows should match", expected, actual);
+ }
+
+ @Test
+ public void testOverwrite() throws IOException {
+ File parent = temp.newFolder("parquet");
+ File location = new File(parent, "test");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ List<SimpleRecord> expected = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "a"),
+ new SimpleRecord(3, "c"),
+ new SimpleRecord(4, "b"),
+ new SimpleRecord(6, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+ df.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(location.toString());
+
+ // overwrite with 2*id to replace record 2, append 4 and 6
+ df.withColumn("id", df.col("id").multiply(2)).select("id", "data").write()
+ .format("iceberg")
+ .mode("overwrite")
+ .save(location.toString());
+
+ table.refresh();
+
+ Dataset<Row> result = spark.read()
+ .format("iceberg")
+ .load(location.toString());
+
+ List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+ Assert.assertEquals("Result rows should match", expected, actual);
+ }
+
+ @Test
+ public void testUnpartitionedOverwrite() throws IOException {
+ File parent = temp.newFolder("parquet");
+ File location = new File(parent, "test");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> expected = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+ df.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(location.toString());
+
+ // overwrite with the same data; should not produce two copies
+ df.select("id", "data").write()
+ .format("iceberg")
+ .mode("overwrite")
+ .save(location.toString());
+
+ table.refresh();
+
+ Dataset<Row> result = spark.read()
+ .format("iceberg")
+ .load(location.toString());
+
+ List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+ Assert.assertEquals("Result rows should match", expected, actual);
+ }
}