You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/09/11 11:53:00 UTC

[jira] [Commented] (PARQUET-1389) Improve value skipping at page synchronization

    [ https://issues.apache.org/jira/browse/PARQUET-1389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16610486#comment-16610486 ] 

ASF GitHub Bot commented on PARQUET-1389:
-----------------------------------------

gszadovszky closed pull request #514: PARQUET-1389: Improve value skipping at page synchronization
URL: https://github.com/apache/parquet-mr/pull/514
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
index 0af85c701..76822367d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
@@ -72,6 +72,14 @@
      */
     abstract void skip();
 
+    /**
+     * Skips n values from the underlying page
+     *
+     * @param n
+     *          the number of values to be skipped
+     */
+    abstract void skip(int n);
+
     /**
      * write current value to converter
      */
@@ -163,6 +171,10 @@ void read() {
           public void skip() {
             dataColumn.skip();
           }
+          @Override
+          void skip(int n) {
+            dataColumn.skip(n);
+          }
           public int getDictionaryId() {
             return dictionaryId;
           }
@@ -203,6 +215,11 @@ public void skip() {
             current = 0;
             dataColumn.skip();
           }
+          @Override
+          void skip(int n) {
+            current = 0;
+            dataColumn.skip(n);
+          }
           public float getFloat() {
             return current;
           }
@@ -222,6 +239,11 @@ public void skip() {
             current = 0;
             dataColumn.skip();
           }
+          @Override
+          void skip(int n) {
+            current = 0;
+            dataColumn.skip(n);
+          }
           public double getDouble() {
             return current;
           }
@@ -242,6 +264,11 @@ public void skip() {
             dataColumn.skip();
           }
           @Override
+          void skip(int n) {
+            current = 0;
+            dataColumn.skip(n);
+          }
+          @Override
           public int getInteger() {
             return current;
           }
@@ -262,6 +289,11 @@ public void skip() {
             dataColumn.skip();
           }
           @Override
+          void skip(int n) {
+            current = 0;
+            dataColumn.skip(n);
+          }
+          @Override
           public long getLong() {
             return current;
           }
@@ -291,6 +323,11 @@ public void skip() {
             dataColumn.skip();
           }
           @Override
+          void skip(int n) {
+            current = false;
+            dataColumn.skip(n);
+          }
+          @Override
           public boolean getBoolean() {
             return current;
           }
@@ -311,6 +348,11 @@ public void skip() {
             dataColumn.skip();
           }
           @Override
+          void skip(int n) {
+            current = null;
+            dataColumn.skip(n);
+          }
+          @Override
           public Binary getBinary() {
             return current;
           }
@@ -511,6 +553,7 @@ public int getCurrentDefinitionLevel() {
 
   private void checkRead() {
     int rl, dl;
+    int skipValues = 0;
     for (;;) {
       if (isPageFullyConsumed()) {
         if (isFullyConsumed()) {
@@ -519,6 +562,7 @@ private void checkRead() {
           return;
         }
         readPage();
+        skipValues = 0;
       }
       rl = repetitionLevelColumn.nextInt();
       dl = definitionLevelColumn.nextInt();
@@ -527,9 +571,10 @@ private void checkRead() {
         break;
       }
       if (dl == maxDefinitionLevel) {
-        binding.skip();
+        ++skipValues;
       }
     }
+    binding.skip(skipValues);
     repetitionLevel = rl;
     definitionLevel = dl;
   }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
index 57326607b..3167d82f7 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
@@ -109,5 +109,17 @@ public long readLong() {
    * Skips the next value in the page
    */
   abstract public void skip();
+
+  /**
+   * Skips the next n value in the page
+   *
+   * @param n
+   *          the number of values to be skipped
+   */
+  public void skip(int n) {
+    for (int i = 0; i < n; ++i) {
+      skip();
+    }
+  }
 }
 
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
index dceaa526f..58e02f276 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
@@ -88,6 +88,14 @@ public void skip() {
     valuesRead++;
   }
 
+  @Override
+  public void skip(int n) {
+    // checkRead() is invoked before incrementing valuesRead so increase valuesRead size in 2 steps
+    valuesRead += n - 1;
+    checkRead();
+    ++valuesRead;
+  }
+
   @Override
   public int readInteger() {
     // TODO: probably implement it separately
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
index 1a2ccb9b5..4dbbcb564 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
@@ -20,8 +20,6 @@
 
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
@@ -64,7 +62,15 @@ public Binary readBytes() {
 
   @Override
   public void skip() {
-    int length = lengthReader.readInteger();
+    skip(1);
+  }
+
+  @Override
+  public void skip(int n) {
+    int length = 0;
+    for (int i = 0; i < n; ++i) {
+      length += lengthReader.readInteger();
+    }
     try {
       in.skipFully(length);
     } catch (IOException e) {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
index 15ed43438..631c9084d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
@@ -19,7 +19,6 @@
 package org.apache.parquet.column.values.plain;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.ParquetDecodingException;
@@ -51,8 +50,13 @@ public Binary readBytes() {
 
   @Override
   public void skip() {
+    skip(1);
+  }
+
+  @Override
+  public void skip(int n) {
     try {
-      in.skipFully(length);
+      in.skipFully(n * length);
     } catch (IOException | RuntimeException e) {
       throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e);
     }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
index f576528a9..127817eb0 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
@@ -41,14 +41,26 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO
     this.in = new LittleEndianDataInputStream(stream.remainingStream());
   }
 
+  @Override
+  public void skip() {
+    skip(1);
+  }
+
+  void skipBytesFully(int n) throws IOException {
+    int skipped = 0;
+    while (skipped < n) {
+      skipped += in.skipBytes(n - skipped);
+    }
+  }
+
   public static class DoublePlainValuesReader extends PlainValuesReader {
 
     @Override
-    public void skip() {
+    public void skip(int n) {
       try {
-        in.skipBytes(8);
+        skipBytesFully(n * 8);
       } catch (IOException e) {
-        throw new ParquetDecodingException("could not skip double", e);
+        throw new ParquetDecodingException("could not skip " + n + " double values", e);
       }
     }
 
@@ -65,11 +77,11 @@ public double readDouble() {
   public static class FloatPlainValuesReader extends PlainValuesReader {
 
     @Override
-    public void skip() {
+    public void skip(int n) {
       try {
-        in.skipBytes(4);
+        skipBytesFully(n * 4);
       } catch (IOException e) {
-        throw new ParquetDecodingException("could not skip float", e);
+        throw new ParquetDecodingException("could not skip " + n + " floats", e);
       }
     }
 
@@ -86,11 +98,11 @@ public float readFloat() {
   public static class IntegerPlainValuesReader extends PlainValuesReader {
 
     @Override
-    public void skip() {
+    public void skip(int n) {
       try {
-        in.skipBytes(4);
+        in.skipBytes(n * 4);
       } catch (IOException e) {
-        throw new ParquetDecodingException("could not skip int", e);
+        throw new ParquetDecodingException("could not skip " + n + " ints", e);
       }
     }
 
@@ -107,11 +119,11 @@ public int readInteger() {
   public static class LongPlainValuesReader extends PlainValuesReader {
 
     @Override
-    public void skip() {
+    public void skip(int n) {
       try {
-        in.skipBytes(8);
+        in.skipBytes(n * 8);
       } catch (IOException e) {
-        throw new ParquetDecodingException("could not skip long", e);
+        throw new ParquetDecodingException("could not skip " + n + " longs", e);
       }
     }
 
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
index fe00de999..8039cf9e4 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
@@ -19,8 +19,6 @@
 package org.apache.parquet.column.values.rle;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
 
@@ -43,4 +41,8 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO
   public void skip() {
   }
 
+  @Override
+  public void skip(int n) {
+  }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Improve value skipping at page synchronization
> ----------------------------------------------
>
>                 Key: PARQUET-1389
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1389
>             Project: Parquet
>          Issue Type: Sub-task
>            Reporter: Gabor Szadovszky
>            Assignee: Gabor Szadovszky
>            Priority: Minor
>              Labels: pull-request-available
>
> Currently, value skipping is done one-by-one for page synchronization. There are encodings (e.g. plain) where several values can be skipped at once. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)