You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/07/30 16:32:05 UTC
[incubator-iceberg] branch master updated: Add overwrite option to
write builders (#318)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f4fc8ff Add overwrite option to write builders (#318)
f4fc8ff is described below
commit f4fc8ff7ef3495880ef6b360966f9c67544a4386
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Tue Jul 30 19:31:58 2019 +0300
Add overwrite option to write builders (#318)
---
.../main/java/org/apache/iceberg/ManifestListWriter.java | 1 +
core/src/main/java/org/apache/iceberg/ManifestWriter.java | 1 +
core/src/main/java/org/apache/iceberg/avro/Avro.java | 12 +++++++++++-
.../java/org/apache/iceberg/avro/AvroFileAppender.java | 6 +++---
orc/src/main/java/org/apache/iceberg/orc/ORC.java | 9 +++++++++
.../src/main/java/org/apache/iceberg/parquet/Parquet.java | 14 ++++++++++++--
.../java/org/apache/iceberg/parquet/ParquetWriter.java | 5 +++--
7 files changed, 40 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
index 2d578e7..0271695 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
@@ -73,6 +73,7 @@ class ManifestListWriter implements FileAppender<ManifestFile> {
.schema(ManifestFile.schema())
.named("manifest_file")
.meta(meta)
+ .overwrite()
.build();
} catch (IOException e) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 0fe7da3..eb53829 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -195,6 +195,7 @@ public class ManifestWriter implements FileAppender<DataFile> {
.meta("schema", SchemaParser.toJson(spec.schema()))
.meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
.meta("partition-spec-id", String.valueOf(spec.specId()))
+ .overwrite()
.build();
default:
throw new IllegalArgumentException("Unsupported format: " + format);
diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 65ba9d8..720baec 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -84,6 +84,7 @@ public class Avro {
private Map<String, String> config = Maps.newHashMap();
private Map<String, String> metadata = Maps.newLinkedHashMap();
private Function<Schema, DatumWriter<?>> createWriterFunc = GenericAvroWriter::new;
+ private boolean overwrite;
private WriteBuilder(OutputFile file) {
this.file = file;
@@ -124,6 +125,15 @@ public class Avro {
return this;
}
+ public WriteBuilder overwrite() {
+ return overwrite(true);
+ }
+
+ public WriteBuilder overwrite(boolean enabled) {
+ this.overwrite = enabled;
+ return this;
+ }
+
private CodecFactory codec() {
String codec = config.getOrDefault(AVRO_COMPRESSION, AVRO_COMPRESSION_DEFAULT);
try {
@@ -141,7 +151,7 @@ public class Avro {
meta("iceberg.schema", SchemaParser.toJson(schema));
return new AvroFileAppender<>(
- AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata);
+ AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata, overwrite);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
index 8cc0712..a77b07d 100644
--- a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
@@ -39,8 +39,9 @@ class AvroFileAppender<D> implements FileAppender<D> {
AvroFileAppender(Schema schema, OutputFile file,
Function<Schema, DatumWriter<?>> createWriterFunc,
- CodecFactory codec, Map<String, String> metadata) throws IOException {
- this.stream = file.create();
+ CodecFactory codec, Map<String, String> metadata,
+ boolean overwrite) throws IOException {
+ this.stream = overwrite ? file.createOrOverwrite() : file.create();
this.writer = newAvroWriter(schema, stream, createWriterFunc, codec, metadata);
}
@@ -92,7 +93,6 @@ class AvroFileAppender<D> implements FileAppender<D> {
writer.setMeta(entry.getKey(), entry.getValue());
}
- // TODO: support overwrite
return writer.create(schema, stream);
}
}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index a014d2a..0f7ee46 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -89,6 +89,15 @@ public class ORC {
return this;
}
+ public WriteBuilder overwrite() {
+ return overwrite(true);
+ }
+
+ public WriteBuilder overwrite(boolean enabled) {
+ OrcConf.OVERWRITE_OUTPUT_FILE.setBoolean(conf, enabled);
+ return this;
+ }
+
public <D> FileAppender<D> build() {
Preconditions.checkNotNull(schema, "Schema is required");
return new OrcFileAppender<>(TypeConversion.toOrc(schema, new ColumnIdMap()),
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 494f2c5..b7a31d0 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -85,6 +85,7 @@ public class Parquet {
private Map<String, String> config = Maps.newLinkedHashMap();
private Function<MessageType, ParquetValueWriter<?>> createWriterFunc = null;
private MetricsConfig metricsConfig = MetricsConfig.getDefault();
+ private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE;
private WriteBuilder(OutputFile file) {
this.file = file;
@@ -138,6 +139,15 @@ public class Parquet {
return this;
}
+ public WriteBuilder overwrite() {
+ return overwrite(true);
+ }
+
+ public WriteBuilder overwrite(boolean enabled) {
+ this.writeMode = enabled ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
+ return this;
+ }
+
@SuppressWarnings("unchecked")
private <T> WriteSupport<T> getWriteSupport(MessageType type) {
if (writeSupport != null) {
@@ -201,7 +211,7 @@ public class Parquet {
return new org.apache.iceberg.parquet.ParquetWriter<>(
conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec(),
- parquetProperties, metricsConfig);
+ parquetProperties, metricsConfig, writeMode);
} else {
return new ParquetWriteAdapter<>(new ParquetWriteBuilder<D>(ParquetIO.file(file))
.withWriterVersion(writerVersion)
@@ -210,7 +220,7 @@ public class Parquet {
.setKeyValueMetadata(metadata)
.setWriteSupport(getWriteSupport(type))
.withCompressionCodec(codec())
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) // TODO: support modes
+ .withWriteMode(writeMode)
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index c0956a1..593de9b 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -83,7 +83,8 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
CompressionCodecName codec,
ParquetProperties properties,
- MetricsConfig metricsConfig) {
+ MetricsConfig metricsConfig,
+ ParquetFileWriter.Mode writeMode) {
this.output = output;
this.targetRowGroupSize = rowGroupSize;
this.props = properties;
@@ -95,7 +96,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
try {
this.writer = new ParquetFileWriter(ParquetIO.file(output, conf), parquetSchema,
- ParquetFileWriter.Mode.OVERWRITE, rowGroupSize, 0);
+ writeMode, rowGroupSize, 0);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create Parquet file");
}