You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/20 21:42:09 UTC

[GitHub] [iceberg] rdblue opened a new pull request #1222: Add Avro row position reader

rdblue opened a new pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222


   This adds an Avro `ValueReader` that returns the position of a row within a file.
   
   The position reader's initial position is set from a callback that returns the starting row position of the split that is being read. The callback is passed to classes that implement a new interface, `SupportsRowPosition`. This uses a callback so that if there is no position reader, the starting row position does not need to be calculated, which is expensive.
   
   Finding the row position at the start of a split requires scanning through an Avro file stream. `AvroIO` now includes a utility method that keeps track of the number of rows in each Avro block and seeks past the block content until the next block is after the given split starting point. This validates Avro sync bytes to ensure the count is accurate.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459713408



##########
File path: core/src/test/java/org/apache/iceberg/avro/TestAvroFileSplit.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroFileSplit {
+  private static final Schema SCHEMA = new Schema(
+      NestedField.required(1, "id", Types.LongType.get()),
+      NestedField.required(2, "data", Types.StringType.get()));
+
+  private static final int NUM_RECORDS = 100_000;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public List<Record> expected = null;
+  public InputFile file = null;
+
+  @Before
+  public void writeDataFile() throws IOException {
+    this.expected = Lists.newArrayList();
+
+    OutputFile out = Files.localOutput(temp.newFile());
+
+    try (FileAppender<Object> writer = Avro.write(out)
+        .set(TableProperties.AVRO_COMPRESSION, "uncompressed")
+        .createWriterFunc(DataWriter::create)
+        .schema(SCHEMA)
+        .overwrite()
+        .build()) {
+
+      Record record = GenericRecord.create(SCHEMA);
+      for (long i = 0; i < NUM_RECORDS; i += 1) {
+        Record next = record.copy(ImmutableMap.of(
+            "id", i,
+            "data", UUID.randomUUID().toString()));
+        expected.add(next);
+        writer.add(next);
+      }
+    }
+
+    this.file = out.toInputFile();
+  }
+
+  @Test
+  public void testSplitDataSkipping() throws IOException {
+    long end = file.getLength();
+    long splitLocation = end / 2;
+
+    List<Record> firstHalf = readAvro(file, SCHEMA, 0, splitLocation);
+    Assert.assertNotEquals("First split should not be empty", 0, firstHalf.size());
+
+    List<Record> secondHalf = readAvro(file, SCHEMA, splitLocation + 1, end - splitLocation - 1);
+    Assert.assertNotEquals("Second split should not be empty", 0, secondHalf.size());
+
+    Assert.assertEquals("Total records should match expected",
+        expected.size(), firstHalf.size() + secondHalf.size());
+
+    for (int i = 0; i < firstHalf.size(); i += 1) {
+      Assert.assertEquals(expected.get(i), firstHalf.get(i));
+    }
+
+    for (int i = 0; i < secondHalf.size(); i += 1) {
+      Assert.assertEquals(expected.get(firstHalf.size() + i), secondHalf.get(i));
+    }
+  }
+
+  @Test
+  public void testPosField() throws IOException {
+    Schema projection = new Schema(
+        SCHEMA.columns().get(0),
+        MetadataColumns.ROW_POSITION,
+        SCHEMA.columns().get(1));
+
+    List<Record> records = readAvro(file, projection, 0, file.getLength());
+
+    for (int i = 0; i < expected.size(); i += 1) {
+      Assert.assertEquals("Field _pos should match",
+          (long) i, records.get(i).getField(MetadataColumns.ROW_POSITION.name()));
+      Assert.assertEquals("Field id should match",
+          expected.get(i).getField("id"), records.get(i).getField("id"));
+      Assert.assertEquals("Field data should match",
+          expected.get(i).getField("data"), records.get(i).getField("data"));
+    }
+  }
+
+  @Test
+  public void testPosFieldWithSplits() throws IOException {
+    Schema projection = new Schema(
+        SCHEMA.columns().get(0),
+        MetadataColumns.ROW_POSITION,
+        SCHEMA.columns().get(1));
+
+    long end = file.getLength();
+    long splitLocation = end / 2;
+
+    List<Record> secondHalf = readAvro(file, projection, splitLocation + 1, end - splitLocation - 1);
+    Assert.assertNotEquals("Second split should not be empty", 0, secondHalf.size());
+
+    List<Record> firstHalf = readAvro(file, projection, 0, splitLocation);
+    Assert.assertNotEquals("First split should not be empty", 0, firstHalf.size());
+
+    Assert.assertEquals("Total records should match expected",
+        expected.size(), firstHalf.size() + secondHalf.size());
+
+    for (int i = 0; i < firstHalf.size(); i += 1) {
+      Assert.assertEquals("Field _pos should match",
+          (long) i, firstHalf.get(i).getField(MetadataColumns.ROW_POSITION.name()));
+      Assert.assertEquals("Field id should match",
+          expected.get(i).getField("id"), firstHalf.get(i).getField("id"));
+      Assert.assertEquals("Field data should match",
+          expected.get(i).getField("data"), firstHalf.get(i).getField("data"));
+    }
+
+    for (int i = 0; i < secondHalf.size(); i += 1) {
+      Assert.assertEquals("Field _pos should match",
+          (long) (firstHalf.size() + i), secondHalf.get(i).getField(MetadataColumns.ROW_POSITION.name()));
+      Assert.assertEquals("Field id should match",
+          expected.get(firstHalf.size() + i).getField("id"), secondHalf.get(i).getField("id"));
+      Assert.assertEquals("Field data should match",
+          expected.get(firstHalf.size() + i).getField("data"), secondHalf.get(i).getField("data"));
+    }
+  }
+
+  @Test
+  public void testPosWithEOFSplit() throws IOException {

Review comment:
       @shardulm94, here's a test for the EOF case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459626570



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroIO.java
##########
@@ -131,4 +144,55 @@ public boolean markSupported() {
       return stream.markSupported();
     }
   }
+
+  static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) {
+    try (SeekableInputStream in = open.get()) {
+      // use a direct decoder that will not buffer so the position of the input stream is accurate
+      BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+
+      // an Avro file's layout looks like this:
+      //   header|block|block|...
+      // the header contains:
+      //   magic|string-map|sync
+      // each block consists of:
+      //   row-count|compressed-size-in-bytes|block-bytes|sync
+
+      // it is necessary to read the header here because this is the only way to get the expected file sync bytes
+      byte[] magic = MAGIC_READER.read(decoder, null);
+      if (!Arrays.equals(AVRO_MAGIC, magic)) {
+        throw new InvalidAvroMagicException("Not an Avro file");
+      }
+
+      META_READER.read(decoder, null); // ignore the file metadata, it isn't needed
+      byte[] fileSync = SYNC_READER.read(decoder, null);
+
+      // the while loop reads row counts and seeks past the block bytes until the next sync pos is >= start, which
+      // indicates that the next sync is the start of the split.
+      byte[] blockSync = new byte[16];
+      long totalRows = 0;
+      long nextSyncPos = in.getPos();
+
+      while (nextSyncPos < start) {

Review comment:
       Yeah, this is a bit strange in Avro. If the `start` of a split is past the start of the sync, then the split will actually start from the next sync.
   
   To find the actual start of the split, not the start of the assigned range, Avro will read 16 bytes from `start` and then continually read one more byte and see if the rolling 16 match the sync. In the case that `start` is `syncPos + 1`, it won't match the sync bytes until the _next_ sync, even though the actual block starts after `start`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r458961341



##########
File path: core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
##########
@@ -117,6 +126,7 @@ private ReadBuilder(Map<Integer, ?> idToConstant) {
     @Override
     public ValueReader<?> record(Types.StructType struct, Schema record,
                                  List<String> names, List<ValueReader<?>> fields) {
+

Review comment:
       Removing the empty line? I'm OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459239292



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
##########
@@ -582,13 +585,37 @@ protected StructReader(List<ValueReader<?>> readers, Types.StructType struct, Ma
         if (idToConstant.containsKey(field.fieldId())) {
           positionList.add(pos);
           constantList.add(idToConstant.get(field.fieldId()));
+        } else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) {
+          // replace the _pos field reader with a position reader
+          // only if the position reader is set and this is a top-level field

Review comment:
       I don't understand this comment line
   > only if the position reader is set
   
   reader is set where?
   > and this is a top-level field
   
   I don't see where the top-level field restriction comes from




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#issuecomment-663223521


   +1 LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459712376



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroIO.java
##########
@@ -131,4 +144,55 @@ public boolean markSupported() {
       return stream.markSupported();
     }
   }
+
+  static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) {
+    try (SeekableInputStream in = open.get()) {
+      // use a direct decoder that will not buffer so the position of the input stream is accurate
+      BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+
+      // an Avro file's layout looks like this:
+      //   header|block|block|...
+      // the header contains:
+      //   magic|string-map|sync
+      // each block consists of:
+      //   row-count|compressed-size-in-bytes|block-bytes|sync
+
+      // it is necessary to read the header here because this is the only way to get the expected file sync bytes
+      byte[] magic = MAGIC_READER.read(decoder, null);
+      if (!Arrays.equals(AVRO_MAGIC, magic)) {
+        throw new InvalidAvroMagicException("Not an Avro file");
+      }
+
+      META_READER.read(decoder, null); // ignore the file metadata, it isn't needed
+      byte[] fileSync = SYNC_READER.read(decoder, null);
+
+      // the while loop reads row counts and seeks past the block bytes until the next sync pos is >= start, which
+      // indicates that the next sync is the start of the split.
+      byte[] blockSync = new byte[16];
+      long totalRows = 0;
+      long nextSyncPos = in.getPos();
+
+      while (nextSyncPos < start) {

Review comment:
       The fix is to catch `EOFException` and return the number of rows.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459628142



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroIO.java
##########
@@ -131,4 +144,55 @@ public boolean markSupported() {
       return stream.markSupported();
     }
   }
+
+  static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) {
+    try (SeekableInputStream in = open.get()) {
+      // use a direct decoder that will not buffer so the position of the input stream is accurate
+      BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+
+      // an Avro file's layout looks like this:
+      //   header|block|block|...
+      // the header contains:
+      //   magic|string-map|sync
+      // each block consists of:
+      //   row-count|compressed-size-in-bytes|block-bytes|sync
+
+      // it is necessary to read the header here because this is the only way to get the expected file sync bytes
+      byte[] magic = MAGIC_READER.read(decoder, null);
+      if (!Arrays.equals(AVRO_MAGIC, magic)) {
+        throw new InvalidAvroMagicException("Not an Avro file");
+      }
+
+      META_READER.read(decoder, null); // ignore the file metadata, it isn't needed
+      byte[] fileSync = SYNC_READER.read(decoder, null);
+
+      // the while loop reads row counts and seeks past the block bytes until the next sync pos is >= start, which
+      // indicates that the next sync is the start of the split.
+      byte[] blockSync = new byte[16];
+      long totalRows = 0;
+      long nextSyncPos = in.getPos();
+
+      while (nextSyncPos < start) {
+        if (nextSyncPos != in.getPos()) {
+          in.seek(nextSyncPos);
+          SYNC_READER.read(decoder, blockSync);
+
+          if (!Arrays.equals(fileSync, blockSync)) {
+            throw new RuntimeIOException("Invalid sync at %s", nextSyncPos);

Review comment:
       Yes, but readers still expect it from existing code paths. Since this code is called in paths that may still depend on `RuntimeIOException`, we should still throw it here. I think we should continue to throw it until the 0.11 release, when we can move all of the instances to `UncheckedIOException`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459667153



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroIO.java
##########
@@ -131,4 +144,55 @@ public boolean markSupported() {
       return stream.markSupported();
     }
   }
+
+  static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) {
+    try (SeekableInputStream in = open.get()) {
+      // use a direct decoder that will not buffer so the position of the input stream is accurate
+      BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+
+      // an Avro file's layout looks like this:
+      //   header|block|block|...
+      // the header contains:
+      //   magic|string-map|sync
+      // each block consists of:
+      //   row-count|compressed-size-in-bytes|block-bytes|sync
+
+      // it is necessary to read the header here because this is the only way to get the expected file sync bytes
+      byte[] magic = MAGIC_READER.read(decoder, null);
+      if (!Arrays.equals(AVRO_MAGIC, magic)) {
+        throw new InvalidAvroMagicException("Not an Avro file");
+      }
+
+      META_READER.read(decoder, null); // ignore the file metadata, it isn't needed
+      byte[] fileSync = SYNC_READER.read(decoder, null);
+
+      // the while loop reads row counts and seeks past the block bytes until the next sync pos is >= start, which
+      // indicates that the next sync is the start of the split.
+      byte[] blockSync = new byte[16];
+      long totalRows = 0;
+      long nextSyncPos = in.getPos();
+
+      while (nextSyncPos < start) {

Review comment:
       Thanks for the insightful comment! (before it went away 😃 ) I was not familiar Avro's split reading behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459681535



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroIO.java
##########
@@ -131,4 +144,55 @@ public boolean markSupported() {
       return stream.markSupported();
     }
   }
+
+  static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) {
+    try (SeekableInputStream in = open.get()) {
+      // use a direct decoder that will not buffer so the position of the input stream is accurate
+      BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+
+      // an Avro file's layout looks like this:
+      //   header|block|block|...
+      // the header contains:
+      //   magic|string-map|sync
+      // each block consists of:
+      //   row-count|compressed-size-in-bytes|block-bytes|sync
+
+      // it is necessary to read the header here because this is the only way to get the expected file sync bytes
+      byte[] magic = MAGIC_READER.read(decoder, null);
+      if (!Arrays.equals(AVRO_MAGIC, magic)) {
+        throw new InvalidAvroMagicException("Not an Avro file");
+      }
+
+      META_READER.read(decoder, null); // ignore the file metadata, it isn't needed
+      byte[] fileSync = SYNC_READER.read(decoder, null);
+
+      // the while loop reads row counts and seeks past the block bytes until the next sync pos is >= start, which
+      // indicates that the next sync is the start of the split.
+      byte[] blockSync = new byte[16];
+      long totalRows = 0;
+      long nextSyncPos = in.getPos();
+
+      while (nextSyncPos < start) {

Review comment:
       Sorry, I should have edited, not deleted. I missed the part about it being just before the EOF.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459621640



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroIO.java
##########
@@ -131,4 +144,55 @@ public boolean markSupported() {
       return stream.markSupported();
     }
   }
+
+  static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) {
+    try (SeekableInputStream in = open.get()) {
+      // use a direct decoder that will not buffer so the position of the input stream is accurate
+      BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+
+      // an Avro file's layout looks like this:
+      //   header|block|block|...
+      // the header contains:
+      //   magic|string-map|sync
+      // each block consists of:
+      //   row-count|compressed-size-in-bytes|block-bytes|sync
+
+      // it is necessary to read the header here because this is the only way to get the expected file sync bytes
+      byte[] magic = MAGIC_READER.read(decoder, null);
+      if (!Arrays.equals(AVRO_MAGIC, magic)) {
+        throw new InvalidAvroMagicException("Not an Avro file");
+      }
+
+      META_READER.read(decoder, null); // ignore the file metadata, it isn't needed
+      byte[] fileSync = SYNC_READER.read(decoder, null);
+
+      // the while loop reads row counts and seeks past the block bytes until the next sync pos is >= start, which
+      // indicates that the next sync is the start of the split.
+      byte[] blockSync = new byte[16];
+      long totalRows = 0;
+      long nextSyncPos = in.getPos();
+
+      while (nextSyncPos < start) {
+        if (nextSyncPos != in.getPos()) {
+          in.seek(nextSyncPos);
+          SYNC_READER.read(decoder, blockSync);
+
+          if (!Arrays.equals(fileSync, blockSync)) {
+            throw new RuntimeIOException("Invalid sync at %s", nextSyncPos);

Review comment:
       nit: This is deprecated in favor of UncheckedIOException




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459659466



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroIO.java
##########
@@ -131,4 +144,55 @@ public boolean markSupported() {
       return stream.markSupported();
     }
   }
+
+  static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) {
+    try (SeekableInputStream in = open.get()) {
+      // use a direct decoder that will not buffer so the position of the input stream is accurate
+      BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+
+      // an Avro file's layout looks like this:
+      //   header|block|block|...
+      // the header contains:
+      //   magic|string-map|sync
+      // each block consists of:
+      //   row-count|compressed-size-in-bytes|block-bytes|sync
+
+      // it is necessary to read the header here because this is the only way to get the expected file sync bytes
+      byte[] magic = MAGIC_READER.read(decoder, null);
+      if (!Arrays.equals(AVRO_MAGIC, magic)) {
+        throw new InvalidAvroMagicException("Not an Avro file");
+      }
+
+      META_READER.read(decoder, null); // ignore the file metadata, it isn't needed
+      byte[] fileSync = SYNC_READER.read(decoder, null);
+
+      // the while loop reads row counts and seeks past the block bytes until the next sync pos is >= start, which
+      // indicates that the next sync is the start of the split.
+      byte[] blockSync = new byte[16];
+      long totalRows = 0;
+      long nextSyncPos = in.getPos();
+
+      while (nextSyncPos < start) {
+        if (nextSyncPos != in.getPos()) {
+          in.seek(nextSyncPos);
+          SYNC_READER.read(decoder, blockSync);
+
+          if (!Arrays.equals(fileSync, blockSync)) {
+            throw new RuntimeIOException("Invalid sync at %s", nextSyncPos);

Review comment:
       Makes sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459239292



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
##########
@@ -582,13 +585,37 @@ protected StructReader(List<ValueReader<?>> readers, Types.StructType struct, Ma
         if (idToConstant.containsKey(field.fieldId())) {
           positionList.add(pos);
           constantList.add(idToConstant.get(field.fieldId()));
+        } else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) {
+          // replace the _pos field reader with a position reader
+          // only if the position reader is set and this is a top-level field

Review comment:
       Not sure I understand this comment line
   > only if the position reader is set
   
   reader is set where?
   > and this is a top-level field
   
   I don't see where the top-level field restriction comes from

##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroIO.java
##########
@@ -131,4 +144,55 @@ public boolean markSupported() {
       return stream.markSupported();
     }
   }
+
+  static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) {
+    try (SeekableInputStream in = open.get()) {
+      // use a direct decoder that will not buffer so the position of the input stream is accurate
+      BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+
+      // an Avro file's layout looks like this:
+      //   header|block|block|...
+      // the header contains:
+      //   magic|string-map|sync
+      // each block consists of:
+      //   row-count|compressed-size-in-bytes|block-bytes|sync
+
+      // it is necessary to read the header here because this is the only way to get the expected file sync bytes
+      byte[] magic = MAGIC_READER.read(decoder, null);
+      if (!Arrays.equals(AVRO_MAGIC, magic)) {
+        throw new InvalidAvroMagicException("Not an Avro file");
+      }
+
+      META_READER.read(decoder, null); // ignore the file metadata, it isn't needed
+      byte[] fileSync = SYNC_READER.read(decoder, null);
+
+      // the while loop reads row counts and seeks past the block bytes until the next sync pos is >= start, which
+      // indicates that the next sync is the start of the split.
+      byte[] blockSync = new byte[16];
+      long totalRows = 0;
+      long nextSyncPos = in.getPos();
+
+      while (nextSyncPos < start) {

Review comment:
       Not sure if this valid start state, but I think there can be an edge case here
   ```
   row-count|compressed-size-in-bytes|block-bytes|sync|EOF
                                                    ^
                                                    |
                                                  start
   ```
   In this case it will seek and read the sync and then continue to read next block row count even after EOF




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459247710



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroIO.java
##########
@@ -131,4 +144,55 @@ public boolean markSupported() {
       return stream.markSupported();
     }
   }
+
+  static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) {
+    try (SeekableInputStream in = open.get()) {
+      // use a direct decoder that will not buffer so the position of the input stream is accurate
+      BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+
+      // an Avro file's layout looks like this:
+      //   header|block|block|...
+      // the header contains:
+      //   magic|string-map|sync
+      // each block consists of:
+      //   row-count|compressed-size-in-bytes|block-bytes|sync
+
+      // it is necessary to read the header here because this is the only way to get the expected file sync bytes
+      byte[] magic = MAGIC_READER.read(decoder, null);
+      if (!Arrays.equals(AVRO_MAGIC, magic)) {
+        throw new InvalidAvroMagicException("Not an Avro file");
+      }
+
+      META_READER.read(decoder, null); // ignore the file metadata, it isn't needed
+      byte[] fileSync = SYNC_READER.read(decoder, null);
+
+      // the while loop reads row counts and seeks past the block bytes until the next sync pos is >= start, which
+      // indicates that the next sync is the start of the split.
+      byte[] blockSync = new byte[16];
+      long totalRows = 0;
+      long nextSyncPos = in.getPos();
+
+      while (nextSyncPos < start) {

Review comment:
       Not sure if this valid start state, but I think there can be an edge case here
   ```
   row-count|compressed-size-in-bytes|block-bytes|sync|EOF
                                                    ^
                                                    |
                                                  start
   ```
   In this case it will seek and read the sync and then continue to read next block row count even after EOF
   
   So should probably be `while (nextSyncPos + 16 < start)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459632521



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroIO.java
##########
@@ -131,4 +144,55 @@ public boolean markSupported() {
       return stream.markSupported();
     }
   }
+
+  static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) {
+    try (SeekableInputStream in = open.get()) {
+      // use a direct decoder that will not buffer so the position of the input stream is accurate
+      BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+
+      // an Avro file's layout looks like this:
+      //   header|block|block|...
+      // the header contains:
+      //   magic|string-map|sync
+      // each block consists of:
+      //   row-count|compressed-size-in-bytes|block-bytes|sync
+
+      // it is necessary to read the header here because this is the only way to get the expected file sync bytes
+      byte[] magic = MAGIC_READER.read(decoder, null);
+      if (!Arrays.equals(AVRO_MAGIC, magic)) {
+        throw new InvalidAvroMagicException("Not an Avro file");
+      }
+
+      META_READER.read(decoder, null); // ignore the file metadata, it isn't needed
+      byte[] fileSync = SYNC_READER.read(decoder, null);
+
+      // the while loop reads row counts and seeks past the block bytes until the next sync pos is >= start, which
+      // indicates that the next sync is the start of the split.
+      byte[] blockSync = new byte[16];
+      long totalRows = 0;
+      long nextSyncPos = in.getPos();
+
+      while (nextSyncPos < start) {

Review comment:
       The loop condition is correct with respect to `start` because a block starts at the beginning of a sync. You're right about the EOF behavior, though. I'll take a look at that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459626570



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroIO.java
##########
@@ -131,4 +144,55 @@ public boolean markSupported() {
       return stream.markSupported();
     }
   }
+
+  static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) {
+    try (SeekableInputStream in = open.get()) {
+      // use a direct decoder that will not buffer so the position of the input stream is accurate
+      BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+
+      // an Avro file's layout looks like this:
+      //   header|block|block|...
+      // the header contains:
+      //   magic|string-map|sync
+      // each block consists of:
+      //   row-count|compressed-size-in-bytes|block-bytes|sync
+
+      // it is necessary to read the header here because this is the only way to get the expected file sync bytes
+      byte[] magic = MAGIC_READER.read(decoder, null);
+      if (!Arrays.equals(AVRO_MAGIC, magic)) {
+        throw new InvalidAvroMagicException("Not an Avro file");
+      }
+
+      META_READER.read(decoder, null); // ignore the file metadata, it isn't needed
+      byte[] fileSync = SYNC_READER.read(decoder, null);
+
+      // the while loop reads row counts and seeks past the block bytes until the next sync pos is >= start, which
+      // indicates that the next sync is the start of the split.
+      byte[] blockSync = new byte[16];
+      long totalRows = 0;
+      long nextSyncPos = in.getPos();
+
+      while (nextSyncPos < start) {

Review comment:
       Yeah, this is a bit strange in Avro. If the `start` of a split is past the start of the sync, then the split will actually start from the next sync.
   
   To find the actual start of the split, not the start of the assigned range, Avro will read 16 bytes from `start` and then continually read one more byte and see if the rolling 16 match the sync. In the case that `start` is `syncPos + 1`, it won't match the sync bytes until the _next_ sync, even though the actual block starts after `start`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459627243



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
##########
@@ -582,13 +585,37 @@ protected StructReader(List<ValueReader<?>> readers, Types.StructType struct, Ma
         if (idToConstant.containsKey(field.fieldId())) {
           positionList.add(pos);
           constantList.add(idToConstant.get(field.fieldId()));
+        } else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) {
+          // replace the _pos field reader with a position reader
+          // only if the position reader is set and this is a top-level field

Review comment:
       This is out of date. I had to move the check because the position reader is set after this constructor. I'll update the comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1222: Add Avro row position reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r457772196



##########
File path: core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
##########
@@ -117,6 +126,7 @@ private ReadBuilder(Map<Integer, ?> idToConstant) {
     @Override
     public ValueReader<?> record(Types.StructType struct, Schema record,
                                  List<String> names, List<ValueReader<?>> fields) {
+

Review comment:
       Should remove this before committing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org