You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/19 22:41:55 UTC

[incubator-iceberg] branch master updated: Add length to FileAppender to avoid a call to S3 when writing. (#101)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e6bf4a  Add length to FileAppender to avoid a call to S3 when writing. (#101)
5e6bf4a is described below

commit 5e6bf4adbefe8bffb16675ef68601e4db5b0d942
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Tue Mar 19 15:41:51 2019 -0700

    Add length to FileAppender to avoid a call to S3 when writing. (#101)
---
 api/src/main/java/com/netflix/iceberg/Files.java   | 11 ++++++++--
 .../java/com/netflix/iceberg/io/FileAppender.java  |  5 +++++
 .../com/netflix/iceberg/ManifestListWriter.java    |  5 +++++
 .../java/com/netflix/iceberg/ManifestWriter.java   |  9 +++++---
 .../com/netflix/iceberg/avro/AvroFileAppender.java | 24 +++++++++++++++++++---
 .../com/netflix/iceberg/orc/OrcFileAppender.java   | 14 ++++++++++++-
 .../iceberg/parquet/ParquetWriteAdapter.java       |  8 ++++++++
 .../com/netflix/iceberg/parquet/ParquetWriter.java |  9 ++++++++
 8 files changed, 76 insertions(+), 9 deletions(-)

diff --git a/api/src/main/java/com/netflix/iceberg/Files.java b/api/src/main/java/com/netflix/iceberg/Files.java
index 197dcc1..0a2805f 100644
--- a/api/src/main/java/com/netflix/iceberg/Files.java
+++ b/api/src/main/java/com/netflix/iceberg/Files.java
@@ -62,7 +62,7 @@ public class Files {
       }
 
       try {
-        return new PositionFileOutputStream(new RandomAccessFile(file, "rw"));
+        return new PositionFileOutputStream(file, new RandomAccessFile(file, "rw"));
       } catch (FileNotFoundException e) {
         throw new RuntimeIOException(e, "Failed to create file: %s", file);
       }
@@ -185,14 +185,20 @@ public class Files {
   }
 
   private static class PositionFileOutputStream extends PositionOutputStream {
+    private final File file;
     private final RandomAccessFile stream;
+    private boolean isClosed = false;
 
-    private PositionFileOutputStream(RandomAccessFile stream) {
+    private PositionFileOutputStream(File file, RandomAccessFile stream) {
+      this.file = file;
       this.stream = stream;
     }
 
     @Override
     public long getPos() throws IOException {
+      if (isClosed) {
+        return file.length();
+      }
       return stream.getFilePointer();
     }
 
@@ -209,6 +215,7 @@ public class Files {
     @Override
     public void close() throws IOException {
       stream.close();
+      this.isClosed = true;
     }
 
     @Override
diff --git a/api/src/main/java/com/netflix/iceberg/io/FileAppender.java b/api/src/main/java/com/netflix/iceberg/io/FileAppender.java
index 6f51886..535d739 100644
--- a/api/src/main/java/com/netflix/iceberg/io/FileAppender.java
+++ b/api/src/main/java/com/netflix/iceberg/io/FileAppender.java
@@ -40,4 +40,9 @@ public interface FileAppender<D> extends Closeable {
    * @return {@link Metrics} for this file. Only valid after the file is closed.
    */
   Metrics metrics();
+
+  /**
+   * @return the length of this file. Only valid after the file is closed.
+   */
+  long length();
 }
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java
index 98cdbbf..d7ef491 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java
@@ -62,6 +62,11 @@ class ManifestListWriter implements FileAppender<ManifestFile> {
     writer.close();
   }
 
+  @Override
+  public long length() {
+    return writer.length();
+  }
+
   private static FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
     try {
       return Avro.write(file)
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
index a59c100..9ecaa81 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
@@ -36,7 +36,6 @@ import static com.netflix.iceberg.ManifestEntry.Status.DELETED;
 class ManifestWriter implements FileAppender<DataFile> {
   private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class);
 
-  private final String location;
   private final OutputFile file;
   private final int specId;
   private final FileAppender<ManifestEntry> writer;
@@ -50,7 +49,6 @@ class ManifestWriter implements FileAppender<DataFile> {
   private int deletedFiles = 0;
 
   ManifestWriter(PartitionSpec spec, OutputFile file, long snapshotId) {
-    this.location = file.location();
     this.file = file;
     this.specId = spec.specId();
     this.writer = newAppender(FileFormat.AVRO, spec, file);
@@ -119,9 +117,14 @@ class ManifestWriter implements FileAppender<DataFile> {
     return writer.metrics();
   }
 
+  @Override
+  public long length() {
+    return writer.length();
+  }
+
   public ManifestFile toManifestFile() {
     Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed");
-    return new GenericManifestFile(location, file.toInputFile().getLength(), specId, snapshotId,
+    return new GenericManifestFile(file.location(), writer.length(), specId, snapshotId,
         addedFiles, existingFiles, deletedFiles, stats.summaries());
   }
 
diff --git a/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java b/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java
index e730ffa..b510180 100644
--- a/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java
+++ b/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java
@@ -19,10 +19,12 @@
 
 package com.netflix.iceberg.avro;
 
+import com.google.common.base.Preconditions;
 import com.netflix.iceberg.Metrics;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.io.FileAppender;
 import com.netflix.iceberg.io.OutputFile;
+import com.netflix.iceberg.io.PositionOutputStream;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
@@ -32,13 +34,15 @@ import java.util.Map;
 import java.util.function.Function;
 
 class AvroFileAppender<D> implements FileAppender<D> {
+  private PositionOutputStream stream = null;
   private DataFileWriter<D> writer = null;
   private long numRecords = 0L;
 
   AvroFileAppender(Schema schema, OutputFile file,
                    Function<Schema, DatumWriter<?>> createWriterFunc,
                    CodecFactory codec, Map<String, String> metadata) throws IOException {
-    this.writer = newAvroWriter(schema, file, createWriterFunc, codec, metadata);
+    this.stream = file.create();
+    this.writer = newAvroWriter(schema, stream, createWriterFunc, codec, metadata);
   }
 
   @Override
@@ -57,6 +61,20 @@ class AvroFileAppender<D> implements FileAppender<D> {
   }
 
   @Override
+  public long length() {
+    Preconditions.checkState(writer == null,
+        "Cannot return length while appending to an open file.");
+    if (stream != null) {
+      try {
+        return stream.getPos();
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to get stream length");
+      }
+    }
+    throw new RuntimeIOException("Failed to get stream length: no open stream");
+  }
+
+  @Override
   public void close() throws IOException {
     if (writer != null) {
       writer.close();
@@ -66,7 +84,7 @@ class AvroFileAppender<D> implements FileAppender<D> {
 
   @SuppressWarnings("unchecked")
   private static <D> DataFileWriter<D> newAvroWriter(
-      Schema schema, OutputFile file, Function<Schema, DatumWriter<?>> createWriterFunc,
+      Schema schema, PositionOutputStream stream, Function<Schema, DatumWriter<?>> createWriterFunc,
       CodecFactory codec, Map<String, String> metadata) throws IOException {
     DataFileWriter<D> writer = new DataFileWriter<>(
         (DatumWriter<D>) createWriterFunc.apply(schema));
@@ -78,6 +96,6 @@ class AvroFileAppender<D> implements FileAppender<D> {
     }
 
     // TODO: support overwrite
-    return writer.create(schema, file.create());
+    return writer.create(schema, stream);
   }
 }
diff --git a/orc/src/main/java/com/netflix/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/com/netflix/iceberg/orc/OrcFileAppender.java
index 6ad82ee..e020ff8 100644
--- a/orc/src/main/java/com/netflix/iceberg/orc/OrcFileAppender.java
+++ b/orc/src/main/java/com/netflix/iceberg/orc/OrcFileAppender.java
@@ -15,6 +15,7 @@
  */
 package com.netflix.iceberg.orc;
 
+import com.google.common.base.Preconditions;
 import com.netflix.iceberg.Metrics;
 import com.netflix.iceberg.Schema;
 import com.netflix.iceberg.io.FileAppender;
@@ -39,6 +40,7 @@ public class OrcFileAppender implements FileAppender<VectorizedRowBatch> {
   private final TypeDescription orcSchema;
   private final ColumnIdMap columnIds = new ColumnIdMap();
   private final Path path;
+  private boolean isClosed = false;
 
   public static final String COLUMN_NUMBERS_ATTRIBUTE = "iceberg.column.ids";
 
@@ -98,8 +100,18 @@ public class OrcFileAppender implements FileAppender<VectorizedRowBatch> {
   }
 
   @Override
+  public long length() {
+    Preconditions.checkState(isClosed,
+        "Cannot return length while appending to an open file.");
+    return writer.getRawDataSize();
+  }
+
+  @Override
   public void close() throws IOException {
-    writer.close();
+    if (!isClosed) {
+      this.isClosed = true;
+      writer.close();
+    }
   }
 
   public TypeDescription getSchema() {
diff --git a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriteAdapter.java b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriteAdapter.java
index 37bc972..493832c 100644
--- a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriteAdapter.java
+++ b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriteAdapter.java
@@ -19,6 +19,7 @@
 
 package com.netflix.iceberg.parquet;
 
+import com.google.common.base.Preconditions;
 import com.netflix.iceberg.Metrics;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.io.FileAppender;
@@ -49,6 +50,13 @@ public class ParquetWriteAdapter<D> implements FileAppender<D> {
   }
 
   @Override
+  public long length() {
+    Preconditions.checkState(writer == null,
+        "Cannot return length while appending to an open file.");
+    return writer.getDataSize();
+  }
+
+  @Override
   public void close() throws IOException {
     if (writer != null) {
       writer.close();
diff --git a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java
index ee7ee8f..fe5760c 100644
--- a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java
@@ -118,6 +118,15 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
     return ParquetMetrics.fromMetadata(writer.getFooter());
   }
 
+  @Override
+  public long length() {
+    try {
+      return writer.getPos();
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to get file length");
+    }
+  }
+
   private void checkSize() {
     if (recordCount >= nextCheckRecordCount) {
       long bufferedSize = writeStore.getBufferedSize();