You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2020/04/09 03:28:49 UTC

[incubator-iceberg] branch master updated: Wrap DataFile when writing to support any implementation (#901)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 8089a95  Wrap DataFile when writing to support any implementation (#901)
8089a95 is described below

commit 8089a9507af10dd3f96e49a1c89d65ec85718206
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Wed Apr 8 20:28:43 2020 -0700

    Wrap DataFile when writing to support any implementation (#901)
---
 .../java/org/apache/iceberg/ManifestEntry.java     | 202 ++++++++++++++++++++-
 .../java/org/apache/iceberg/ManifestWriter.java    |   2 -
 2 files changed, 201 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
index 728d923..82eca81 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
@@ -20,7 +20,10 @@
 package org.apache.iceberg;
 
 import com.google.common.base.MoreObjects;
+import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.specific.SpecificData;
 import org.apache.iceberg.avro.AvroSchemaUtil;
@@ -49,20 +52,24 @@ class ManifestEntry implements IndexedRecord, SpecificData.SchemaConstructable {
   }
 
   private final org.apache.avro.Schema schema;
+  private final IndexedDataFile fileWrapper;
   private Status status = Status.EXISTING;
   private Long snapshotId = null;
   private DataFile file = null;
 
   ManifestEntry(org.apache.avro.Schema schema) {
     this.schema = schema;
+    this.fileWrapper = null; // do not use the file wrapper to read
   }
 
   ManifestEntry(StructType partitionType) {
     this.schema = AvroSchemaUtil.convert(getSchema(partitionType), "manifest_entry");
+    this.fileWrapper = new IndexedDataFile(schema.getField("data_file").schema());
   }
 
   private ManifestEntry(ManifestEntry toCopy, boolean fullCopy) {
     this.schema = toCopy.schema;
+    this.fileWrapper = new IndexedDataFile(schema.getField("data_file").schema());
     this.status = toCopy.status;
     this.snapshotId = toCopy.snapshotId;
     if (fullCopy) {
@@ -151,7 +158,11 @@ class ManifestEntry implements IndexedRecord, SpecificData.SchemaConstructable {
       case 1:
         return snapshotId;
       case 2:
-        return file;
+        if (fileWrapper == null || file instanceof GenericDataFile) {
+          return file;
+        } else {
+          return fileWrapper.wrap(file);
+        }
       default:
         throw new UnsupportedOperationException("Unknown field ordinal: " + i);
     }
@@ -187,4 +198,193 @@ class ManifestEntry implements IndexedRecord, SpecificData.SchemaConstructable {
         .add("file", file)
         .toString();
   }
+
+  private static class IndexedStructLike implements StructLike, IndexedRecord {
+    private final org.apache.avro.Schema avroSchema;
+    private StructLike wrapped = null;
+
+    IndexedStructLike(org.apache.avro.Schema avroSchema) {
+      this.avroSchema = avroSchema;
+    }
+
+    public IndexedStructLike wrap(StructLike struct) {
+      this.wrapped = struct;
+      return this;
+    }
+
+    @Override
+    public int size() {
+      return wrapped.size();
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+      return wrapped.get(pos, javaClass);
+    }
+
+    @Override
+    public Object get(int pos) {
+      return get(pos, Object.class);
+    }
+
+    @Override
+    public <T> void set(int pos, T value) {
+      wrapped.set(pos, value);
+    }
+
+    @Override
+    public void put(int pos, Object value) {
+      set(pos, value);
+    }
+
+    @Override
+    public org.apache.avro.Schema getSchema() {
+      return avroSchema;
+    }
+  }
+
+  private static class IndexedDataFile implements DataFile, IndexedRecord {
+    private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
+
+    private final org.apache.avro.Schema avroSchema;
+    private final IndexedStructLike partitionWrapper;
+    private DataFile wrapped = null;
+
+    IndexedDataFile(org.apache.avro.Schema avroSchema) {
+      this.avroSchema = avroSchema;
+      this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema());
+    }
+
+    public IndexedDataFile wrap(DataFile file) {
+      this.wrapped = file;
+      return this;
+    }
+
+    @Override
+    public Object get(int pos) {
+      switch (pos) {
+        case 0:
+          return wrapped.path().toString();
+        case 1:
+          return wrapped.format() != null ? wrapped.format().toString() : null;
+        case 2:
+          return partitionWrapper.wrap(wrapped.partition());
+        case 3:
+          return wrapped.recordCount();
+        case 4:
+          return wrapped.fileSizeInBytes();
+        case 5:
+          return DEFAULT_BLOCK_SIZE;
+        case 6:
+          return wrapped.fileOrdinal();
+        case 7:
+          return wrapped.sortColumns();
+        case 8:
+          return wrapped.columnSizes();
+        case 9:
+          return wrapped.valueCounts();
+        case 10:
+          return wrapped.nullValueCounts();
+        case 11:
+          return wrapped.lowerBounds();
+        case 12:
+          return wrapped.upperBounds();
+        case 13:
+          return wrapped.keyMetadata();
+        case 14:
+          return wrapped.splitOffsets();
+      }
+      throw new IllegalArgumentException("Unknown field ordinal: " + pos);
+    }
+
+    @Override
+    public void put(int i, Object v) {
+      throw new UnsupportedOperationException("Cannot read into IndexedDataFile");
+    }
+
+    @Override
+    public org.apache.avro.Schema getSchema() {
+      return avroSchema;
+    }
+
+    @Override
+    public CharSequence path() {
+      return wrapped.path();
+    }
+
+    @Override
+    public FileFormat format() {
+      return wrapped.format();
+    }
+
+    @Override
+    public StructLike partition() {
+      return wrapped.partition();
+    }
+
+    @Override
+    public long recordCount() {
+      return wrapped.recordCount();
+    }
+
+    @Override
+    public long fileSizeInBytes() {
+      return wrapped.fileSizeInBytes();
+    }
+
+    @Override
+    public Integer fileOrdinal() {
+      return wrapped.fileOrdinal();
+    }
+
+    @Override
+    public List<Integer> sortColumns() {
+      return wrapped.sortColumns();
+    }
+
+    @Override
+    public Map<Integer, Long> columnSizes() {
+      return wrapped.columnSizes();
+    }
+
+    @Override
+    public Map<Integer, Long> valueCounts() {
+      return wrapped.valueCounts();
+    }
+
+    @Override
+    public Map<Integer, Long> nullValueCounts() {
+      return wrapped.nullValueCounts();
+    }
+
+    @Override
+    public Map<Integer, ByteBuffer> lowerBounds() {
+      return wrapped.lowerBounds();
+    }
+
+    @Override
+    public Map<Integer, ByteBuffer> upperBounds() {
+      return wrapped.upperBounds();
+    }
+
+    @Override
+    public ByteBuffer keyMetadata() {
+      return wrapped.keyMetadata();
+    }
+
+    @Override
+    public List<Long> splitOffsets() {
+      return wrapped.splitOffsets();
+    }
+
+    @Override
+    public DataFile copy() {
+      return wrapped.copy();
+    }
+
+    @Override
+    public DataFile copyWithoutStats() {
+      return wrapped.copyWithoutStats();
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index c99c491..fffac1f 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -148,8 +148,6 @@ public class ManifestWriter implements FileAppender<DataFile> {
    */
   @Override
   public void add(DataFile addedFile) {
-    // TODO: this assumes that file is a GenericDataFile that can be written directly to Avro
-    // Eventually, this should check in case there are other DataFile implementations.
     addEntry(reused.wrapAppend(snapshotId, addedFile));
   }