You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/09/24 23:24:21 UTC
[incubator-iceberg] branch master updated: Spark: Allow limiting
output data file size (#432)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c2435d6 Spark: Allow limiting output data file size (#432)
c2435d6 is described below
commit c2435d64709b9657768f35ae5048c9ef4a352716
Author: Xabriel J. Collazo Mojica <xc...@adobe.com>
AuthorDate: Tue Sep 24 16:24:16 2019 -0700
Spark: Allow limiting output data file size (#432)
---
.../java/org/apache/iceberg/TableProperties.java | 3 +
.../org/apache/iceberg/parquet/ParquetWriter.java | 2 +-
site/docs/configuration.md | 2 +
.../org/apache/iceberg/spark/source/Writer.java | 276 ++++++++++++---------
.../iceberg/spark/source/TestParquetWrite.java | 91 +++++++
5 files changed, 261 insertions(+), 113 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 88057bf..7479fd6 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -100,4 +100,7 @@ public class TableProperties {
public static final String WRITE_AUDIT_PUBLISH_ENABLED = "write.wap.enabled";
public static final String WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT = "false";
+
+ public static final String WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes";
+ public static final long WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = Long.MAX_VALUE;
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index 593de9b..14838d5 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -126,7 +126,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
@Override
public long length() {
try {
- return writer.getPos();
+ return writer.getPos() + writeStore.getBufferedSize();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to get file length");
}
diff --git a/site/docs/configuration.md b/site/docs/configuration.md
index 3e939fe..419ef22 100644
--- a/site/docs/configuration.md
+++ b/site/docs/configuration.md
@@ -25,6 +25,7 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.metadata.compression-codec | none | Metadata compression codec; none or gzip |
| write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full |
| write.metadata.metrics.column.col1 | (not set) | Metrics mode for column 'col1' to allow per-column tuning; none, counts, truncate(length), or full |
+| write.target-file-size-bytes | Long.MAX_VALUE | Controls the size of files generated to target about this many bytes. |
### Table behavior properties
@@ -75,4 +76,5 @@ df.write
| Spark option | Default | Description |
| ------------ | -------------------------- | ------------------------------------------------------------ |
| write-format | Table write.format.default | File format to use for this write operation; parquet or avro |
+| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 889118b..b05fe3b 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -19,12 +19,10 @@
package org.apache.iceberg.spark.source;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@@ -33,7 +31,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
-import java.util.function.Function;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
@@ -57,6 +54,7 @@ import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.spark.data.SparkAvroWriter;
import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -77,6 +75,8 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
// TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage
class Writer implements DataSourceWriter {
@@ -89,6 +89,7 @@ class Writer implements DataSourceWriter {
private final boolean replacePartitions;
private final String applicationId;
private final String wapId;
+ private final long targetFileSize;
Writer(Table table, DataSourceOptions options, boolean replacePartitions, String applicationId) {
this(table, options, replacePartitions, applicationId, null);
@@ -102,6 +103,10 @@ class Writer implements DataSourceWriter {
this.replacePartitions = replacePartitions;
this.applicationId = applicationId;
this.wapId = wapId;
+
+ long tableTargetFileSize = PropertyUtil.propertyAsLong(
+ table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ this.targetFileSize = options.getLong("target-file-size-bytes", tableTargetFileSize);
}
private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
@@ -119,7 +124,7 @@ class Writer implements DataSourceWriter {
@Override
public DataWriterFactory<InternalRow> createWriterFactory() {
return new WriterFactory(
- table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager);
+ table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager, targetFileSize);
}
@Override
@@ -242,34 +247,31 @@ class Writer implements DataSourceWriter {
private final FileFormat format;
private final LocationProvider locations;
private final Map<String, String> properties;
- private final String uuid = UUID.randomUUID().toString();
private final FileIO fileIo;
private final EncryptionManager encryptionManager;
+ private final long targetFileSize;
WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
- Map<String, String> properties, FileIO fileIo, EncryptionManager encryptionManager) {
+ Map<String, String> properties, FileIO fileIo, EncryptionManager encryptionManager,
+ long targetFileSize) {
this.spec = spec;
this.format = format;
this.locations = locations;
this.properties = properties;
this.fileIo = fileIo;
this.encryptionManager = encryptionManager;
+ this.targetFileSize = targetFileSize;
}
@Override
public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
- String filename = format.addExtension(String.format("%05d-%d-%s", partitionId, taskId, uuid));
- AppenderFactory<InternalRow> factory = new SparkAppenderFactory();
+ OutputFileFactory fileFactory = new OutputFileFactory(partitionId, taskId, epochId);
+ AppenderFactory<InternalRow> appenderFactory = new SparkAppenderFactory();
+
if (spec.fields().isEmpty()) {
- OutputFile outputFile = fileIo.newOutputFile(locations.newDataLocation(filename));
- return new UnpartitionedWriter(encryptionManager.encrypt(outputFile), format, factory, fileIo);
+ return new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
} else {
- Function<PartitionKey, EncryptedOutputFile> newOutputFileForKey =
- key -> {
- OutputFile rawOutputFile = fileIo.newOutputFile(locations.newDataLocation(spec, key, filename));
- return encryptionManager.encrypt(rawOutputFile);
- };
- return new PartitionedWriter(spec, format, factory, newOutputFileForKey, fileIo);
+ return new PartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
}
}
@@ -305,142 +307,119 @@ class Writer implements DataSourceWriter {
}
}
}
- }
-
- private interface AppenderFactory<T> {
- FileAppender<T> newAppender(OutputFile file, FileFormat format);
- }
-
- private static class UnpartitionedWriter implements DataWriter<InternalRow>, Closeable {
- private final FileIO fileIo;
- private FileAppender<InternalRow> appender = null;
- private Metrics metrics = null;
- private List<Long> offsetRanges = null;
- private final EncryptedOutputFile file;
-
- UnpartitionedWriter(
- EncryptedOutputFile outputFile,
- FileFormat format,
- AppenderFactory<InternalRow> factory,
- FileIO fileIo) {
- this.fileIo = fileIo;
- this.file = outputFile;
- this.appender = factory.newAppender(file.encryptingOutputFile(), format);
- }
- @Override
- public void write(InternalRow record) {
- appender.add(record);
- }
-
- @Override
- public WriterCommitMessage commit() throws IOException {
- Preconditions.checkArgument(appender != null, "Commit called on a closed writer: %s", this);
-
- // metrics and splitOffsets are populated on close
- close();
-
- if (metrics.recordCount() == 0L) {
- fileIo.deleteFile(file.encryptingOutputFile());
- return new TaskCommit();
+ private class OutputFileFactory {
+ private final int partitionId;
+ private final long taskId;
+ private final long epochId;
+ // The purpose of this uuid is to be able to know from two paths that they were written by the same operation.
+ // That's useful, for example, if a Spark job dies and leaves files in the file system, you can identify them all
+ // with a recursive listing and grep.
+ private final String uuid = UUID.randomUUID().toString();
+ private int fileCount;
+
+ OutputFileFactory(int partitionId, long taskId, long epochId) {
+ this.partitionId = partitionId;
+ this.taskId = taskId;
+ this.epochId = epochId;
+ this.fileCount = 0;
}
- DataFile dataFile = DataFiles.fromEncryptedOutputFile(file, null, metrics, offsetRanges);
-
- return new TaskCommit(dataFile);
- }
-
- @Override
- public void abort() throws IOException {
- Preconditions.checkArgument(appender != null, "Abort called on a closed writer: %s", this);
+ private String generateFilename() {
+ return format.addExtension(String.format("%05d-%d-%s-%05d", partitionId, taskId, uuid, fileCount++));
+ }
- close();
- fileIo.deleteFile(file.encryptingOutputFile());
- }
+ /**
+ * Generates EncryptedOutputFile for UnpartitionedWriter.
+ */
+ public EncryptedOutputFile newOutputFile() {
+ OutputFile file = fileIo.newOutputFile(locations.newDataLocation(generateFilename()));
+ return encryptionManager.encrypt(file);
+ }
- @Override
- public void close() throws IOException {
- if (this.appender != null) {
- this.appender.close();
- this.metrics = appender.metrics();
- this.offsetRanges = appender.splitOffsets();
- this.appender = null;
+ /**
+ * Generates EncryptedOutputFile for PartitionedWriter.
+ */
+ public EncryptedOutputFile newOutputFile(PartitionKey key) {
+ OutputFile rawOutputFile = fileIo.newOutputFile(locations.newDataLocation(spec, key, generateFilename()));
+ return encryptionManager.encrypt(rawOutputFile);
}
}
}
- private static class PartitionedWriter implements DataWriter<InternalRow> {
- private final Set<PartitionKey> completedPartitions = Sets.newHashSet();
+ private interface AppenderFactory<T> {
+ FileAppender<T> newAppender(OutputFile file, FileFormat format);
+ }
+
+ private abstract static class BaseWriter implements DataWriter<InternalRow> {
+ protected static final int ROWS_DIVISOR = 1000;
+
private final List<DataFile> completedFiles = Lists.newArrayList();
private final PartitionSpec spec;
private final FileFormat format;
- private final AppenderFactory<InternalRow> factory;
- private final Function<PartitionKey, EncryptedOutputFile> newOutputFileForKey;
- private final PartitionKey key;
+ private final AppenderFactory<InternalRow> appenderFactory;
+ private final WriterFactory.OutputFileFactory fileFactory;
private final FileIO fileIo;
-
+ private final long targetFileSize;
private PartitionKey currentKey = null;
private FileAppender<InternalRow> currentAppender = null;
private EncryptedOutputFile currentFile = null;
+ private long currentRows = 0;
- PartitionedWriter(
- PartitionSpec spec,
- FileFormat format,
- AppenderFactory<InternalRow> factory,
- Function<PartitionKey, EncryptedOutputFile> newOutputFileForKey,
- FileIO fileIo) {
+ BaseWriter(PartitionSpec spec, FileFormat format, AppenderFactory<InternalRow> appenderFactory,
+ WriterFactory.OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
this.spec = spec;
this.format = format;
- this.factory = factory;
- this.newOutputFileForKey = newOutputFileForKey;
- this.key = new PartitionKey(spec);
+ this.appenderFactory = appenderFactory;
+ this.fileFactory = fileFactory;
this.fileIo = fileIo;
+ this.targetFileSize = targetFileSize;
}
@Override
- public void write(InternalRow row) throws IOException {
- key.partition(row);
+ public abstract void write(InternalRow row) throws IOException;
- if (!key.equals(currentKey)) {
+ public void writeInternal(InternalRow row) throws IOException {
+ if (currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize) {
closeCurrent();
-
- if (completedPartitions.contains(key)) {
- // if rows are not correctly grouped, detect and fail the write
- PartitionKey existingKey = Iterables.find(completedPartitions, key::equals, null);
- LOG.warn("Duplicate key: {} == {}", existingKey, key);
- throw new IllegalStateException("Already closed file for partition: " + key.toPath());
- }
-
- this.currentKey = key.copy();
- this.currentFile = newOutputFileForKey.apply(currentKey);
- this.currentAppender = factory.newAppender(currentFile.encryptingOutputFile(), format);
+ openCurrent();
}
currentAppender.add(row);
+ currentRows++;
}
@Override
public WriterCommitMessage commit() throws IOException {
closeCurrent();
+
return new TaskCommit(completedFiles);
}
@Override
public void abort() throws IOException {
+ closeCurrent();
+
// clean up files created by this writer
Tasks.foreach(completedFiles)
.throwFailureWhenFinished()
.noRetry()
.run(file -> fileIo.deleteFile(file.path().toString()));
+ }
- if (currentAppender != null) {
- currentAppender.close();
- this.currentAppender = null;
- fileIo.deleteFile(currentFile.encryptingOutputFile());
+ protected void openCurrent() {
+ if (spec.fields().size() == 0) {
+ // unpartitioned
+ currentFile = fileFactory.newOutputFile();
+ } else {
+ // partitioned
+ currentFile = fileFactory.newOutputFile(currentKey);
}
+ currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format);
+ currentRows = 0;
}
- private void closeCurrent() throws IOException {
+ protected void closeCurrent() throws IOException {
if (currentAppender != null) {
currentAppender.close();
// metrics are only valid after the appender is closed
@@ -448,16 +427,89 @@ class Writer implements DataSourceWriter {
List<Long> splitOffsets = currentAppender.splitOffsets();
this.currentAppender = null;
- DataFile dataFile = DataFiles.builder(spec)
- .withEncryptedOutputFile(currentFile)
- .withPartition(currentKey)
- .withMetrics(metrics)
- .withSplitOffsets(splitOffsets)
- .build();
+ if (metrics.recordCount() == 0L) {
+ fileIo.deleteFile(currentFile.encryptingOutputFile());
+ } else {
+ DataFile dataFile = DataFiles.builder(spec)
+ .withEncryptedOutputFile(currentFile)
+ .withPartition(spec.fields().size() == 0 ? null : currentKey) // set null if unpartitioned
+ .withMetrics(metrics)
+ .withSplitOffsets(splitOffsets)
+ .build();
+ completedFiles.add(dataFile);
+ }
+
+ this.currentFile = null;
+ }
+ }
+
+ protected PartitionKey getCurrentKey() {
+ return currentKey;
+ }
+
+ protected void setCurrentKey(PartitionKey currentKey) {
+ this.currentKey = currentKey;
+ }
+ }
+
+ private static class UnpartitionedWriter extends BaseWriter {
+ private static final int ROWS_DIVISOR = 1000;
+
+ UnpartitionedWriter(
+ PartitionSpec spec,
+ FileFormat format,
+ AppenderFactory<InternalRow> appenderFactory,
+ WriterFactory.OutputFileFactory fileFactory,
+ FileIO fileIo,
+ long targetFileSize) {
+ super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
+
+ openCurrent();
+ }
+
+ @Override
+ public void write(InternalRow row) throws IOException {
+ writeInternal(row);
+ }
+ }
+
+ private static class PartitionedWriter extends BaseWriter {
+ private final PartitionKey key;
+ private final Set<PartitionKey> completedPartitions = Sets.newHashSet();
+
+ PartitionedWriter(
+ PartitionSpec spec,
+ FileFormat format,
+ AppenderFactory<InternalRow> appenderFactory,
+ WriterFactory.OutputFileFactory fileFactory,
+ FileIO fileIo,
+ long targetFileSize) {
+ super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
+
+ this.key = new PartitionKey(spec);
+ }
+
+ @Override
+ public void write(InternalRow row) throws IOException {
+ key.partition(row);
+ PartitionKey currentKey = getCurrentKey();
+ if (!key.equals(currentKey)) {
+ closeCurrent();
completedPartitions.add(currentKey);
- completedFiles.add(dataFile);
+
+ if (completedPartitions.contains(key)) {
+ // if rows are not correctly grouped, detect and fail the write
+ PartitionKey existingKey = Iterables.find(completedPartitions, key::equals, null);
+ LOG.warn("Duplicate key: {} == {}", existingKey, key);
+ throw new IllegalStateException("Already closed files for partition: " + key.toPath());
+ }
+
+ setCurrentKey(key.copy());
+ openCurrent();
}
+
+ writeInternal(row);
}
}
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
index d8d164b..79a50d2 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
@@ -247,4 +248,94 @@ public class TestParquetWrite {
Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
Assert.assertEquals("Result rows should match", expected, actual);
}
+
+ @Test
+ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws IOException {
+ File parent = temp.newFolder("parquet");
+ File location = new File(parent, "test");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ table.updateProperties()
+ .set(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "4") // ~4 bytes; low enough to trigger
+ .commit();
+
+ List<SimpleRecord> expected = Lists.newArrayListWithCapacity(4000);
+ for (int i = 0; i < 4000; i++) {
+ expected.add(new SimpleRecord(i, "a"));
+ }
+
+ Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+ df.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(location.toString());
+
+ table.refresh();
+
+ Dataset<Row> result = spark.read()
+ .format("iceberg")
+ .load(location.toString());
+
+ List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+ Assert.assertEquals("Result rows should match", expected, actual);
+
+ List<DataFile> files = Lists.newArrayList();
+ for (ManifestFile manifest : table.currentSnapshot().manifests()) {
+ for (DataFile file : ManifestReader.read(localInput(manifest.path()), null)) {
+ files.add(file);
+ }
+ }
+ Assert.assertEquals("Should have 4 DataFiles", 4, files.size());
+ Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
+ }
+
+ @Test
+ public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOException {
+ File parent = temp.newFolder("parquet");
+ File location = new File(parent, "test");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> expected = Lists.newArrayListWithCapacity(8000);
+ for (int i = 0; i < 2000; i++) {
+ expected.add(new SimpleRecord(i, "a"));
+ expected.add(new SimpleRecord(i, "b"));
+ expected.add(new SimpleRecord(i, "c"));
+ expected.add(new SimpleRecord(i, "d"));
+ }
+
+ Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+ df.select("id", "data").sort("data").write()
+ .format("iceberg")
+ .mode("append")
+ .option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger
+ .save(location.toString());
+
+ table.refresh();
+
+ Dataset<Row> result = spark.read()
+ .format("iceberg")
+ .load(location.toString());
+
+ List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+ Assert.assertEquals("Result rows should match", expected, actual);
+
+ List<DataFile> files = Lists.newArrayList();
+ for (ManifestFile manifest : table.currentSnapshot().manifests()) {
+ for (DataFile file : ManifestReader.read(localInput(manifest.path()), null)) {
+ files.add(file);
+ }
+ }
+ Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
+ Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
+ }
}