You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/02/05 00:53:41 UTC

[iceberg] branch master updated: Spark: Fix _pos metadata column in SparkAvroReader (#2215)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bfa0f80  Spark: Fix _pos metadata column in SparkAvroReader (#2215)
bfa0f80 is described below

commit bfa0f80408819adfededd93a0dfac8207416281e
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu Feb 4 16:53:26 2021 -0800

    Spark: Fix _pos metadata column in SparkAvroReader (#2215)
---
 .../java/org/apache/iceberg/spark/data/SparkAvroReader.java   | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
index 46c594e..c693e2e 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
@@ -22,12 +22,14 @@ package org.apache.iceberg.spark.data;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
+import org.apache.iceberg.avro.SupportsRowPosition;
 import org.apache.iceberg.avro.ValueReader;
 import org.apache.iceberg.avro.ValueReaders;
 import org.apache.iceberg.data.avro.DecoderResolver;
@@ -37,7 +39,7 @@ import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.InternalRow;
 
 
-public class SparkAvroReader implements DatumReader<InternalRow> {
+public class SparkAvroReader implements DatumReader<InternalRow>, SupportsRowPosition {
 
   private final Schema readSchema;
   private final ValueReader<InternalRow> reader;
@@ -64,6 +66,13 @@ public class SparkAvroReader implements DatumReader<InternalRow> {
     return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse);
   }
 
+  @Override
+  public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+    if (reader instanceof SupportsRowPosition) {
+      ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+    }
+  }
+
   private static class ReadBuilder extends AvroSchemaWithTypeVisitor<ValueReader<?>> {
     private final Map<Integer, ?> idToConstant;