You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/07/30 16:32:05 UTC

[incubator-iceberg] branch master updated: Add overwrite option to write builders (#318)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f4fc8ff  Add overwrite option to write builders (#318)
f4fc8ff is described below

commit f4fc8ff7ef3495880ef6b360966f9c67544a4386
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Tue Jul 30 19:31:58 2019 +0300

    Add overwrite option to write builders (#318)
---
 .../main/java/org/apache/iceberg/ManifestListWriter.java   |  1 +
 core/src/main/java/org/apache/iceberg/ManifestWriter.java  |  1 +
 core/src/main/java/org/apache/iceberg/avro/Avro.java       | 12 +++++++++++-
 .../java/org/apache/iceberg/avro/AvroFileAppender.java     |  6 +++---
 orc/src/main/java/org/apache/iceberg/orc/ORC.java          |  9 +++++++++
 .../src/main/java/org/apache/iceberg/parquet/Parquet.java  | 14 ++++++++++++--
 .../java/org/apache/iceberg/parquet/ParquetWriter.java     |  5 +++--
 7 files changed, 40 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
index 2d578e7..0271695 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
@@ -73,6 +73,7 @@ class ManifestListWriter implements FileAppender<ManifestFile> {
           .schema(ManifestFile.schema())
           .named("manifest_file")
           .meta(meta)
+          .overwrite()
           .build();
 
     } catch (IOException e) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 0fe7da3..eb53829 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -195,6 +195,7 @@ public class ManifestWriter implements FileAppender<DataFile> {
               .meta("schema", SchemaParser.toJson(spec.schema()))
               .meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
               .meta("partition-spec-id", String.valueOf(spec.specId()))
+              .overwrite()
               .build();
         default:
           throw new IllegalArgumentException("Unsupported format: " + format);
diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 65ba9d8..720baec 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -84,6 +84,7 @@ public class Avro {
     private Map<String, String> config = Maps.newHashMap();
     private Map<String, String> metadata = Maps.newLinkedHashMap();
     private Function<Schema, DatumWriter<?>> createWriterFunc = GenericAvroWriter::new;
+    private boolean overwrite;
 
     private WriteBuilder(OutputFile file) {
       this.file = file;
@@ -124,6 +125,15 @@ public class Avro {
       return this;
     }
 
+    public WriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public WriteBuilder overwrite(boolean enabled) {
+      this.overwrite = enabled;
+      return this;
+    }
+
     private CodecFactory codec() {
       String codec = config.getOrDefault(AVRO_COMPRESSION, AVRO_COMPRESSION_DEFAULT);
       try {
@@ -141,7 +151,7 @@ public class Avro {
       meta("iceberg.schema", SchemaParser.toJson(schema));
 
       return new AvroFileAppender<>(
-          AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata);
+          AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata, overwrite);
     }
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
index 8cc0712..a77b07d 100644
--- a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
@@ -39,8 +39,9 @@ class AvroFileAppender<D> implements FileAppender<D> {
 
   AvroFileAppender(Schema schema, OutputFile file,
                    Function<Schema, DatumWriter<?>> createWriterFunc,
-                   CodecFactory codec, Map<String, String> metadata) throws IOException {
-    this.stream = file.create();
+                   CodecFactory codec, Map<String, String> metadata,
+                   boolean overwrite) throws IOException {
+    this.stream = overwrite ? file.createOrOverwrite() : file.create();
     this.writer = newAvroWriter(schema, stream, createWriterFunc, codec, metadata);
   }
 
@@ -92,7 +93,6 @@ class AvroFileAppender<D> implements FileAppender<D> {
       writer.setMeta(entry.getKey(), entry.getValue());
     }
 
-    // TODO: support overwrite
     return writer.create(schema, stream);
   }
 }
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index a014d2a..0f7ee46 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -89,6 +89,15 @@ public class ORC {
       return this;
     }
 
+    public WriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public WriteBuilder overwrite(boolean enabled) {
+      OrcConf.OVERWRITE_OUTPUT_FILE.setBoolean(conf, enabled);
+      return this;
+    }
+
     public <D> FileAppender<D> build() {
       Preconditions.checkNotNull(schema, "Schema is required");
       return new OrcFileAppender<>(TypeConversion.toOrc(schema, new ColumnIdMap()),
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 494f2c5..b7a31d0 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -85,6 +85,7 @@ public class Parquet {
     private Map<String, String> config = Maps.newLinkedHashMap();
     private Function<MessageType, ParquetValueWriter<?>> createWriterFunc = null;
     private MetricsConfig metricsConfig = MetricsConfig.getDefault();
+    private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE;
 
     private WriteBuilder(OutputFile file) {
       this.file = file;
@@ -138,6 +139,15 @@ public class Parquet {
       return this;
     }
 
+    public WriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public WriteBuilder overwrite(boolean enabled) {
+      this.writeMode = enabled ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
+      return this;
+    }
+
     @SuppressWarnings("unchecked")
     private <T> WriteSupport<T> getWriteSupport(MessageType type) {
       if (writeSupport != null) {
@@ -201,7 +211,7 @@ public class Parquet {
 
         return new org.apache.iceberg.parquet.ParquetWriter<>(
             conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec(),
-            parquetProperties, metricsConfig);
+            parquetProperties, metricsConfig, writeMode);
       } else {
         return new ParquetWriteAdapter<>(new ParquetWriteBuilder<D>(ParquetIO.file(file))
             .withWriterVersion(writerVersion)
@@ -210,7 +220,7 @@ public class Parquet {
             .setKeyValueMetadata(metadata)
             .setWriteSupport(getWriteSupport(type))
             .withCompressionCodec(codec())
-            .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) // TODO: support modes
+            .withWriteMode(writeMode)
             .withRowGroupSize(rowGroupSize)
             .withPageSize(pageSize)
             .withDictionaryPageSize(dictionaryPageSize)
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index c0956a1..593de9b 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -83,7 +83,8 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
                 Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
                 CompressionCodecName codec,
                 ParquetProperties properties,
-                MetricsConfig metricsConfig) {
+                MetricsConfig metricsConfig,
+                ParquetFileWriter.Mode writeMode) {
     this.output = output;
     this.targetRowGroupSize = rowGroupSize;
     this.props = properties;
@@ -95,7 +96,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
 
     try {
       this.writer = new ParquetFileWriter(ParquetIO.file(output, conf), parquetSchema,
-          ParquetFileWriter.Mode.OVERWRITE, rowGroupSize, 0);
+         writeMode, rowGroupSize, 0);
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to create Parquet file");
     }