You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/19 22:41:55 UTC
[incubator-iceberg] branch master updated: Add length to
FileAppender to avoid a call to S3 when writing. (#101)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 5e6bf4a Add length to FileAppender to avoid a call to S3 when writing. (#101)
5e6bf4a is described below
commit 5e6bf4adbefe8bffb16675ef68601e4db5b0d942
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Tue Mar 19 15:41:51 2019 -0700
Add length to FileAppender to avoid a call to S3 when writing. (#101)
---
api/src/main/java/com/netflix/iceberg/Files.java | 11 ++++++++--
.../java/com/netflix/iceberg/io/FileAppender.java | 5 +++++
.../com/netflix/iceberg/ManifestListWriter.java | 5 +++++
.../java/com/netflix/iceberg/ManifestWriter.java | 9 +++++---
.../com/netflix/iceberg/avro/AvroFileAppender.java | 24 +++++++++++++++++++---
.../com/netflix/iceberg/orc/OrcFileAppender.java | 14 ++++++++++++-
.../iceberg/parquet/ParquetWriteAdapter.java | 8 ++++++++
.../com/netflix/iceberg/parquet/ParquetWriter.java | 9 ++++++++
8 files changed, 76 insertions(+), 9 deletions(-)
diff --git a/api/src/main/java/com/netflix/iceberg/Files.java b/api/src/main/java/com/netflix/iceberg/Files.java
index 197dcc1..0a2805f 100644
--- a/api/src/main/java/com/netflix/iceberg/Files.java
+++ b/api/src/main/java/com/netflix/iceberg/Files.java
@@ -62,7 +62,7 @@ public class Files {
}
try {
- return new PositionFileOutputStream(new RandomAccessFile(file, "rw"));
+ return new PositionFileOutputStream(file, new RandomAccessFile(file, "rw"));
} catch (FileNotFoundException e) {
throw new RuntimeIOException(e, "Failed to create file: %s", file);
}
@@ -185,14 +185,20 @@ public class Files {
}
private static class PositionFileOutputStream extends PositionOutputStream {
+ private final File file;
private final RandomAccessFile stream;
+ private boolean isClosed = false;
- private PositionFileOutputStream(RandomAccessFile stream) {
+ private PositionFileOutputStream(File file, RandomAccessFile stream) {
+ this.file = file;
this.stream = stream;
}
@Override
public long getPos() throws IOException {
+ if (isClosed) {
+ return file.length();
+ }
return stream.getFilePointer();
}
@@ -209,6 +215,7 @@ public class Files {
@Override
public void close() throws IOException {
stream.close();
+ this.isClosed = true;
}
@Override
diff --git a/api/src/main/java/com/netflix/iceberg/io/FileAppender.java b/api/src/main/java/com/netflix/iceberg/io/FileAppender.java
index 6f51886..535d739 100644
--- a/api/src/main/java/com/netflix/iceberg/io/FileAppender.java
+++ b/api/src/main/java/com/netflix/iceberg/io/FileAppender.java
@@ -40,4 +40,9 @@ public interface FileAppender<D> extends Closeable {
* @return {@link Metrics} for this file. Only valid after the file is closed.
*/
Metrics metrics();
+
+ /**
+ * @return the length of this file. Only valid after the file is closed.
+ */
+ long length();
}
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java
index 98cdbbf..d7ef491 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java
@@ -62,6 +62,11 @@ class ManifestListWriter implements FileAppender<ManifestFile> {
writer.close();
}
+ @Override
+ public long length() {
+ return writer.length();
+ }
+
private static FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
try {
return Avro.write(file)
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
index a59c100..9ecaa81 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
@@ -36,7 +36,6 @@ import static com.netflix.iceberg.ManifestEntry.Status.DELETED;
class ManifestWriter implements FileAppender<DataFile> {
private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class);
- private final String location;
private final OutputFile file;
private final int specId;
private final FileAppender<ManifestEntry> writer;
@@ -50,7 +49,6 @@ class ManifestWriter implements FileAppender<DataFile> {
private int deletedFiles = 0;
ManifestWriter(PartitionSpec spec, OutputFile file, long snapshotId) {
- this.location = file.location();
this.file = file;
this.specId = spec.specId();
this.writer = newAppender(FileFormat.AVRO, spec, file);
@@ -119,9 +117,14 @@ class ManifestWriter implements FileAppender<DataFile> {
return writer.metrics();
}
+ @Override
+ public long length() {
+ return writer.length();
+ }
+
public ManifestFile toManifestFile() {
Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed");
- return new GenericManifestFile(location, file.toInputFile().getLength(), specId, snapshotId,
+ return new GenericManifestFile(file.location(), writer.length(), specId, snapshotId,
addedFiles, existingFiles, deletedFiles, stats.summaries());
}
diff --git a/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java b/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java
index e730ffa..b510180 100644
--- a/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java
+++ b/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java
@@ -19,10 +19,12 @@
package com.netflix.iceberg.avro;
+import com.google.common.base.Preconditions;
import com.netflix.iceberg.Metrics;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.FileAppender;
import com.netflix.iceberg.io.OutputFile;
+import com.netflix.iceberg.io.PositionOutputStream;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
@@ -32,13 +34,15 @@ import java.util.Map;
import java.util.function.Function;
class AvroFileAppender<D> implements FileAppender<D> {
+ private PositionOutputStream stream = null;
private DataFileWriter<D> writer = null;
private long numRecords = 0L;
AvroFileAppender(Schema schema, OutputFile file,
Function<Schema, DatumWriter<?>> createWriterFunc,
CodecFactory codec, Map<String, String> metadata) throws IOException {
- this.writer = newAvroWriter(schema, file, createWriterFunc, codec, metadata);
+ this.stream = file.create();
+ this.writer = newAvroWriter(schema, stream, createWriterFunc, codec, metadata);
}
@Override
@@ -57,6 +61,20 @@ class AvroFileAppender<D> implements FileAppender<D> {
}
@Override
+ public long length() {
+ Preconditions.checkState(writer == null,
+ "Cannot return length while appending to an open file.");
+ if (stream != null) {
+ try {
+ return stream.getPos();
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to get stream length");
+ }
+ }
+ throw new RuntimeIOException("Failed to get stream length: no open stream");
+ }
+
+ @Override
public void close() throws IOException {
if (writer != null) {
writer.close();
@@ -66,7 +84,7 @@ class AvroFileAppender<D> implements FileAppender<D> {
@SuppressWarnings("unchecked")
private static <D> DataFileWriter<D> newAvroWriter(
- Schema schema, OutputFile file, Function<Schema, DatumWriter<?>> createWriterFunc,
+ Schema schema, PositionOutputStream stream, Function<Schema, DatumWriter<?>> createWriterFunc,
CodecFactory codec, Map<String, String> metadata) throws IOException {
DataFileWriter<D> writer = new DataFileWriter<>(
(DatumWriter<D>) createWriterFunc.apply(schema));
@@ -78,6 +96,6 @@ class AvroFileAppender<D> implements FileAppender<D> {
}
// TODO: support overwrite
- return writer.create(schema, file.create());
+ return writer.create(schema, stream);
}
}
diff --git a/orc/src/main/java/com/netflix/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/com/netflix/iceberg/orc/OrcFileAppender.java
index 6ad82ee..e020ff8 100644
--- a/orc/src/main/java/com/netflix/iceberg/orc/OrcFileAppender.java
+++ b/orc/src/main/java/com/netflix/iceberg/orc/OrcFileAppender.java
@@ -15,6 +15,7 @@
*/
package com.netflix.iceberg.orc;
+import com.google.common.base.Preconditions;
import com.netflix.iceberg.Metrics;
import com.netflix.iceberg.Schema;
import com.netflix.iceberg.io.FileAppender;
@@ -39,6 +40,7 @@ public class OrcFileAppender implements FileAppender<VectorizedRowBatch> {
private final TypeDescription orcSchema;
private final ColumnIdMap columnIds = new ColumnIdMap();
private final Path path;
+ private boolean isClosed = false;
public static final String COLUMN_NUMBERS_ATTRIBUTE = "iceberg.column.ids";
@@ -98,8 +100,18 @@ public class OrcFileAppender implements FileAppender<VectorizedRowBatch> {
}
@Override
+ public long length() {
+ Preconditions.checkState(isClosed,
+ "Cannot return length while appending to an open file.");
+ return writer.getRawDataSize();
+ }
+
+ @Override
public void close() throws IOException {
- writer.close();
+ if (!isClosed) {
+ this.isClosed = true;
+ writer.close();
+ }
}
public TypeDescription getSchema() {
diff --git a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriteAdapter.java b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriteAdapter.java
index 37bc972..493832c 100644
--- a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriteAdapter.java
+++ b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriteAdapter.java
@@ -19,6 +19,7 @@
package com.netflix.iceberg.parquet;
+import com.google.common.base.Preconditions;
import com.netflix.iceberg.Metrics;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.FileAppender;
@@ -49,6 +50,13 @@ public class ParquetWriteAdapter<D> implements FileAppender<D> {
}
@Override
+ public long length() {
+ Preconditions.checkState(writer == null,
+ "Cannot return length while appending to an open file.");
+ return writer.getDataSize();
+ }
+
+ @Override
public void close() throws IOException {
if (writer != null) {
writer.close();
diff --git a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java
index ee7ee8f..fe5760c 100644
--- a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java
@@ -118,6 +118,15 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
return ParquetMetrics.fromMetadata(writer.getFooter());
}
+ @Override
+ public long length() {
+ try {
+ return writer.getPos();
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to get file length");
+ }
+ }
+
private void checkSize() {
if (recordCount >= nextCheckRecordCount) {
long bufferedSize = writeStore.getBufferedSize();