You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/23 06:55:41 UTC

[GitHub] [iceberg] shardulm94 commented on a change in pull request #1222: Add Avro row position reader

shardulm94 commented on a change in pull request #1222:
URL: https://github.com/apache/iceberg/pull/1222#discussion_r459239292



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
##########
@@ -582,13 +585,37 @@ protected StructReader(List<ValueReader<?>> readers, Types.StructType struct, Ma
         if (idToConstant.containsKey(field.fieldId())) {
           positionList.add(pos);
           constantList.add(idToConstant.get(field.fieldId()));
+        } else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) {
+          // replace the _pos field reader with a position reader
+          // only if the position reader is set and this is a top-level field

Review comment:
       Not sure I understand this comment line
   > only if the position reader is set
   
   reader is set where?
   > and this is a top-level field
   
   I don't see where the top-level field restriction comes from

##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroIO.java
##########
@@ -131,4 +144,55 @@ public boolean markSupported() {
       return stream.markSupported();
     }
   }
+
+  static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) {
+    try (SeekableInputStream in = open.get()) {
+      // use a direct decoder that will not buffer so the position of the input stream is accurate
+      BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+
+      // an Avro file's layout looks like this:
+      //   header|block|block|...
+      // the header contains:
+      //   magic|string-map|sync
+      // each block consists of:
+      //   row-count|compressed-size-in-bytes|block-bytes|sync
+
+      // it is necessary to read the header here because this is the only way to get the expected file sync bytes
+      byte[] magic = MAGIC_READER.read(decoder, null);
+      if (!Arrays.equals(AVRO_MAGIC, magic)) {
+        throw new InvalidAvroMagicException("Not an Avro file");
+      }
+
+      META_READER.read(decoder, null); // ignore the file metadata, it isn't needed
+      byte[] fileSync = SYNC_READER.read(decoder, null);
+
+      // the while loop reads row counts and seeks past the block bytes until the next sync pos is >= start, which
+      // indicates that the next sync is the start of the split.
+      byte[] blockSync = new byte[16];
+      long totalRows = 0;
+      long nextSyncPos = in.getPos();
+
+      while (nextSyncPos < start) {

Review comment:
       Not sure if this valid start state, but I think there can be an edge case here
   ```
   row-count|compressed-size-in-bytes|block-bytes|sync|EOF
                                                    ^
                                                    |
                                                  start
   ```
   In this case it will seek and read the sync and then continue to read next block row count even after EOF




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org