You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2020/04/09 03:28:49 UTC
[incubator-iceberg] branch master updated: Wrap DataFile when
writing to support any implementation (#901)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 8089a95 Wrap DataFile when writing to support any implementation (#901)
8089a95 is described below
commit 8089a9507af10dd3f96e49a1c89d65ec85718206
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Wed Apr 8 20:28:43 2020 -0700
Wrap DataFile when writing to support any implementation (#901)
---
.../java/org/apache/iceberg/ManifestEntry.java | 202 ++++++++++++++++++++-
.../java/org/apache/iceberg/ManifestWriter.java | 2 -
2 files changed, 201 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
index 728d923..82eca81 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
@@ -20,7 +20,10 @@
package org.apache.iceberg;
import com.google.common.base.MoreObjects;
+import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.List;
+import java.util.Map;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.iceberg.avro.AvroSchemaUtil;
@@ -49,20 +52,24 @@ class ManifestEntry implements IndexedRecord, SpecificData.SchemaConstructable {
}
private final org.apache.avro.Schema schema;
+ private final IndexedDataFile fileWrapper;
private Status status = Status.EXISTING;
private Long snapshotId = null;
private DataFile file = null;
ManifestEntry(org.apache.avro.Schema schema) {
this.schema = schema;
+ this.fileWrapper = null; // do not use the file wrapper to read
}
ManifestEntry(StructType partitionType) {
this.schema = AvroSchemaUtil.convert(getSchema(partitionType), "manifest_entry");
+ this.fileWrapper = new IndexedDataFile(schema.getField("data_file").schema());
}
private ManifestEntry(ManifestEntry toCopy, boolean fullCopy) {
this.schema = toCopy.schema;
+ this.fileWrapper = new IndexedDataFile(schema.getField("data_file").schema());
this.status = toCopy.status;
this.snapshotId = toCopy.snapshotId;
if (fullCopy) {
@@ -151,7 +158,11 @@ class ManifestEntry implements IndexedRecord, SpecificData.SchemaConstructable {
case 1:
return snapshotId;
case 2:
- return file;
+ if (fileWrapper == null || file instanceof GenericDataFile) {
+ return file;
+ } else {
+ return fileWrapper.wrap(file);
+ }
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
@@ -187,4 +198,193 @@ class ManifestEntry implements IndexedRecord, SpecificData.SchemaConstructable {
.add("file", file)
.toString();
}
+
+ private static class IndexedStructLike implements StructLike, IndexedRecord {
+ private final org.apache.avro.Schema avroSchema;
+ private StructLike wrapped = null;
+
+ IndexedStructLike(org.apache.avro.Schema avroSchema) {
+ this.avroSchema = avroSchema;
+ }
+
+ public IndexedStructLike wrap(StructLike struct) {
+ this.wrapped = struct;
+ return this;
+ }
+
+ @Override
+ public int size() {
+ return wrapped.size();
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ return wrapped.get(pos, javaClass);
+ }
+
+ @Override
+ public Object get(int pos) {
+ return get(pos, Object.class);
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ wrapped.set(pos, value);
+ }
+
+ @Override
+ public void put(int pos, Object value) {
+ set(pos, value);
+ }
+
+ @Override
+ public org.apache.avro.Schema getSchema() {
+ return avroSchema;
+ }
+ }
+
+ private static class IndexedDataFile implements DataFile, IndexedRecord {
+ private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
+
+ private final org.apache.avro.Schema avroSchema;
+ private final IndexedStructLike partitionWrapper;
+ private DataFile wrapped = null;
+
+ IndexedDataFile(org.apache.avro.Schema avroSchema) {
+ this.avroSchema = avroSchema;
+ this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema());
+ }
+
+ public IndexedDataFile wrap(DataFile file) {
+ this.wrapped = file;
+ return this;
+ }
+
+ @Override
+ public Object get(int pos) {
+ switch (pos) {
+ case 0:
+ return wrapped.path().toString();
+ case 1:
+ return wrapped.format() != null ? wrapped.format().toString() : null;
+ case 2:
+ return partitionWrapper.wrap(wrapped.partition());
+ case 3:
+ return wrapped.recordCount();
+ case 4:
+ return wrapped.fileSizeInBytes();
+ case 5:
+ return DEFAULT_BLOCK_SIZE;
+ case 6:
+ return wrapped.fileOrdinal();
+ case 7:
+ return wrapped.sortColumns();
+ case 8:
+ return wrapped.columnSizes();
+ case 9:
+ return wrapped.valueCounts();
+ case 10:
+ return wrapped.nullValueCounts();
+ case 11:
+ return wrapped.lowerBounds();
+ case 12:
+ return wrapped.upperBounds();
+ case 13:
+ return wrapped.keyMetadata();
+ case 14:
+ return wrapped.splitOffsets();
+ }
+ throw new IllegalArgumentException("Unknown field ordinal: " + pos);
+ }
+
+ @Override
+ public void put(int i, Object v) {
+ throw new UnsupportedOperationException("Cannot read into IndexedDataFile");
+ }
+
+ @Override
+ public org.apache.avro.Schema getSchema() {
+ return avroSchema;
+ }
+
+ @Override
+ public CharSequence path() {
+ return wrapped.path();
+ }
+
+ @Override
+ public FileFormat format() {
+ return wrapped.format();
+ }
+
+ @Override
+ public StructLike partition() {
+ return wrapped.partition();
+ }
+
+ @Override
+ public long recordCount() {
+ return wrapped.recordCount();
+ }
+
+ @Override
+ public long fileSizeInBytes() {
+ return wrapped.fileSizeInBytes();
+ }
+
+ @Override
+ public Integer fileOrdinal() {
+ return wrapped.fileOrdinal();
+ }
+
+ @Override
+ public List<Integer> sortColumns() {
+ return wrapped.sortColumns();
+ }
+
+ @Override
+ public Map<Integer, Long> columnSizes() {
+ return wrapped.columnSizes();
+ }
+
+ @Override
+ public Map<Integer, Long> valueCounts() {
+ return wrapped.valueCounts();
+ }
+
+ @Override
+ public Map<Integer, Long> nullValueCounts() {
+ return wrapped.nullValueCounts();
+ }
+
+ @Override
+ public Map<Integer, ByteBuffer> lowerBounds() {
+ return wrapped.lowerBounds();
+ }
+
+ @Override
+ public Map<Integer, ByteBuffer> upperBounds() {
+ return wrapped.upperBounds();
+ }
+
+ @Override
+ public ByteBuffer keyMetadata() {
+ return wrapped.keyMetadata();
+ }
+
+ @Override
+ public List<Long> splitOffsets() {
+ return wrapped.splitOffsets();
+ }
+
+ @Override
+ public DataFile copy() {
+ return wrapped.copy();
+ }
+
+ @Override
+ public DataFile copyWithoutStats() {
+ return wrapped.copyWithoutStats();
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index c99c491..fffac1f 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -148,8 +148,6 @@ public class ManifestWriter implements FileAppender<DataFile> {
*/
@Override
public void add(DataFile addedFile) {
- // TODO: this assumes that file is a GenericDataFile that can be written directly to Avro
- // Eventually, this should check in case there are other DataFile implementations.
addEntry(reused.wrapAppend(snapshotId, addedFile));
}