You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by GitBox <gi...@apache.org> on 2021/02/18 12:18:13 UTC

[GitHub] [parquet-mr] fschmalzel opened a new pull request #871: PARQUET-1982: Random access to row groups in ParquetFileReader

fschmalzel opened a new pull request #871:
URL: https://github.com/apache/parquet-mr/pull/871


   Adds a method readRowGroup(BlockMetaData) to allow random access to
   PageReadStores via BlockMetaData, which can be obtained using the
   getRowGroups() method.
   
   This is similar to the existing method
   getDictionaryReader(BlockMetaData)
   that already exists.
   
   With random access the reader can be reused if for example someone
   needs to go back a row group. This would improve performance
   because we don't need to open the file again and read the metadata.
   
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [x ] My PR addresses the following [Parquet Jira](https://issues.apache.org/jira/browse/PARQUET-1982) issue.
   
   ### Tests
   
   - [x ] My PR adds the following unit tests:
    - TestParquetReaderRandomAccess
   
   ### Commits
   
   - [x ] My commits all reference Jira issues in their subject lines. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [x ] In case of new functionality, my PR adds documentation that describes how to use it.
     - All the public functions and the classes in the PR contain Javadoc that explain what it does
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [parquet-mr] fschmalzel commented on pull request #871: PARQUET-1982: Random access to row groups in ParquetFileReader

Posted by GitBox <gi...@apache.org>.
fschmalzel commented on pull request #871:
URL: https://github.com/apache/parquet-mr/pull/871#issuecomment-811886333


   First of sorry for the late answer.
   
   We use parquet-data to display graphs. We needed the optimization to go back pages if a user scrolls to the left or zooms out. I currently don't have any concrete numbers but it was a lot more performant. Going from practically unusably slow to reasonably fast, comparable to our legacy file format. 
   
   I plan to go and get some more precise numbers for our use case soon.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #871: PARQUET-1982: Random access to row groups in ParquetFileReader

Posted by GitBox <gi...@apache.org>.
gszadovszky commented on a change in pull request #871:
URL: https://github.com/apache/parquet-mr/pull/871#discussion_r578525890



##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -888,20 +888,44 @@ public void appendTo(ParquetFileWriter writer) throws IOException {
     writer.appendRowGroups(f, blocks, true);
   }
 
+  /**
+   * Reads all the columns requested from the row group at the specified block.
+   * @throws IOException if an error occurs while reading
+   * @return the PageReadStore which can provide PageReaders for each column.
+   */
+  public PageReadStore readRowGroup(BlockMetaData block) throws IOException {

Review comment:
       You have created the specific method to read a row group based on the specified metadata (instead simply reading the next row group). This seems fine to me. Meanwhile, there is another method for reading the next row group: `readNextFilteredRowGroup`. This method was created to support the column indexes feature and meanwhile to be backward compatible with the old readers.
   To take advantage of the column indexes feature (and skip the unnecessary pages to get read) and also for symmetry I would suggest creating another specific method: `readFilteredRowGroup(BlockMetaData)`.

##########
File path: parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.apache.parquet.column.Encoding.BIT_PACKED;
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.junit.Assert.*;
+
+public class TestParquetReaderRandomAccess {
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+
+  private static final MessageType SCHEMA = MessageTypeParser.parseMessageType("" +
+    "message m {" +
+    "  required group a {" +
+    "    required binary b;" +
+    "  }" +
+    "  required group c {" +
+    "    required int64 d;" +
+    "  }" +
+    "}");
+  private static final String[] PATH1 = {"a", "b"};
+  private static final ColumnDescriptor C1 = SCHEMA.getColumnDescription(PATH1);
+  private static final String[] PATH2 = {"c", "d"};
+  private static final ColumnDescriptor C2 = SCHEMA.getColumnDescription(PATH2);
+
+  private static final byte[] BYTES1 = { 0, 1, 2, 3 };
+  private static final byte[] BYTES2 = { 1, 2, 3, 4 };
+  private static final byte[] BYTES3 = { 2, 3, 4, 5 };
+  private static final byte[] BYTES4 = { 3, 4, 5, 6 };
+  private static final CompressionCodecName CODEC = CompressionCodecName.UNCOMPRESSED;
+
+  private static final org.apache.parquet.column.statistics.Statistics<?> EMPTY_STATS = org.apache.parquet.column.statistics.Statistics
+    .getBuilderForReading(Types.required(PrimitiveType.PrimitiveTypeName.BINARY).named("test_binary")).build();
+
+  private void writeDataToFile(Path path) throws IOException {
+    Configuration configuration = new Configuration();
+
+    ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path);
+    w.start();
+    w.startBlock(3);
+    w.startColumn(C1, 5, CODEC);
+    w.writeDataPage(2, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(C2, 6, CODEC);
+    w.writeDataPage(2, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(1, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.startBlock(4);
+    w.startColumn(C1, 7, CODEC);
+    w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(C2, 8, CODEC);
+    w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.end(new HashMap<String, String>());
+  }
+
+  @Test
+  public void testReadRandom() throws IOException {
+    File testFile = temp.newFile();
+    testFile.delete();
+    Path path = new Path(testFile.toURI());
+    Configuration configuration = new Configuration();
+    ParquetReadOptions options = ParquetReadOptions.builder().build();
+
+    writeDataToFile(path);
+
+    ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(path, configuration), options);
+    List<BlockMetaData> blocks = reader.getRowGroups();
+
+    // Randomize indexes
+    List<Integer> indexes = new ArrayList<>();
+    for (int i = 0; i < blocks.size(); i++) {
+      for (int j = 0; j < 4; j++) {
+        indexes.add(i);
+      }
+    }
+
+    Collections.shuffle(indexes);

Review comment:
       This way the order of the indexes is not deterministic. Usually we don't like this in unit tests because in case of failure we cannot reproduce the test. I would suggest invoking `shuffle` with a pre-created `Random` instance with a fix seed.

##########
File path: parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.apache.parquet.column.Encoding.BIT_PACKED;
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.junit.Assert.*;
+
+public class TestParquetReaderRandomAccess {
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+
+  private static final MessageType SCHEMA = MessageTypeParser.parseMessageType("" +
+    "message m {" +
+    "  required group a {" +
+    "    required binary b;" +
+    "  }" +
+    "  required group c {" +
+    "    required int64 d;" +
+    "  }" +
+    "}");
+  private static final String[] PATH1 = {"a", "b"};
+  private static final ColumnDescriptor C1 = SCHEMA.getColumnDescription(PATH1);
+  private static final String[] PATH2 = {"c", "d"};
+  private static final ColumnDescriptor C2 = SCHEMA.getColumnDescription(PATH2);
+
+  private static final byte[] BYTES1 = { 0, 1, 2, 3 };
+  private static final byte[] BYTES2 = { 1, 2, 3, 4 };
+  private static final byte[] BYTES3 = { 2, 3, 4, 5 };
+  private static final byte[] BYTES4 = { 3, 4, 5, 6 };
+  private static final CompressionCodecName CODEC = CompressionCodecName.UNCOMPRESSED;
+
+  private static final org.apache.parquet.column.statistics.Statistics<?> EMPTY_STATS = org.apache.parquet.column.statistics.Statistics
+    .getBuilderForReading(Types.required(PrimitiveType.PrimitiveTypeName.BINARY).named("test_binary")).build();
+
+  private void writeDataToFile(Path path) throws IOException {
+    Configuration configuration = new Configuration();
+
+    ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path);
+    w.start();
+    w.startBlock(3);
+    w.startColumn(C1, 5, CODEC);
+    w.writeDataPage(2, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(C2, 6, CODEC);
+    w.writeDataPage(2, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(1, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.startBlock(4);
+    w.startColumn(C1, 7, CODEC);
+    w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(C2, 8, CODEC);
+    w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.end(new HashMap<String, String>());
+  }
+
+  @Test
+  public void testReadRandom() throws IOException {
+    File testFile = temp.newFile();
+    testFile.delete();
+    Path path = new Path(testFile.toURI());
+    Configuration configuration = new Configuration();
+    ParquetReadOptions options = ParquetReadOptions.builder().build();
+
+    writeDataToFile(path);
+
+    ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(path, configuration), options);
+    List<BlockMetaData> blocks = reader.getRowGroups();
+
+    // Randomize indexes
+    List<Integer> indexes = new ArrayList<>();
+    for (int i = 0; i < blocks.size(); i++) {
+      for (int j = 0; j < 4; j++) {
+        indexes.add(i);
+      }
+    }
+
+    Collections.shuffle(indexes);
+
+    for (int index : indexes) {
+      PageReadStore pages = reader.readRowGroup(blocks.get(index));
+      validatePages(pages, index);
+    }
+  }
+
+  @Test
+  public void testReadRandomAndSequential() throws IOException {
+    File testFile = temp.newFile();
+    testFile.delete();
+    Path path = new Path(testFile.toURI());
+    Configuration configuration = new Configuration();
+    ParquetReadOptions options = ParquetReadOptions.builder().build();
+
+    writeDataToFile(path);
+
+    ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(path, configuration), options);
+    List<BlockMetaData> blocks = reader.getRowGroups();
+
+    // Randomize indexes
+    List<Integer> indexes = new ArrayList<>();
+    for (int i = 0; i < blocks.size(); i++) {
+      for (int j = 0; j < 4; j++) {
+        indexes.add(i);
+      }
+    }
+
+    Collections.shuffle(indexes);

Review comment:
       Same as above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [parquet-mr] shangxinli commented on pull request #871: PARQUET-1982: Random access to row groups in ParquetFileReader

Posted by GitBox <gi...@apache.org>.
shangxinli commented on pull request #871:
URL: https://github.com/apache/parquet-mr/pull/871#issuecomment-819523621


   @gszadovszky No I don't have other comments. I didn't get much time to review this code. If you don't have other comments, feel free to approve. If you want me to have a look in more detail, I can do probably do it on the weekend. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #871: PARQUET-1982: Random access to row groups in ParquetFileReader

Posted by GitBox <gi...@apache.org>.
gszadovszky commented on a change in pull request #871:
URL: https://github.com/apache/parquet-mr/pull/871#discussion_r580349408



##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -920,22 +946,52 @@ public PageReadStore readNextRowGroup() throws IOException {
       }
     }
     // actually read all the chunks
-    ChunkListBuilder builder = new ChunkListBuilder();
+    ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
     for (ConsecutivePartList consecutiveChunks : allParts) {
       consecutiveChunks.readAll(f, builder);
     }
     for (Chunk chunk : builder.build()) {
-      readChunkPages(chunk, block);
+      readChunkPages(chunk, block, rowGroup);
     }
 
-    // avoid re-reading bytes the dictionary reader is used after this call
-    if (nextDictionaryReader != null) {
-      nextDictionaryReader.setRowGroup(currentRowGroup);
+    return rowGroup;
+  }
+
+  /**
+   * Reads all the columns requested from the specified row group. It may skip specific pages based on the column
+   * indexes according to the actual filter. As the rows are not aligned among the pages of the different columns row
+   * synchronization might be required. See the documentation of the class SynchronizingColumnReader for details.
+   *
+   * @param blockIndex the index of the requested block
+   * @return the PageReadStore which can provide PageReaders for each column or null if there are no rows in this block
+   * @throws IOException if an error occurs while reading
+   */
+  public PageReadStore readFilteredRowGroup(int blockIndex) throws IOException {

Review comment:
       What is the reason behind you are using a `BlockMetaData` argument for `readRowGroup` while you use the index of the row group here? I think, both should work similarly.

##########
File path: parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.*;
+import org.apache.parquet.statistics.DataGenerationContext;
+import org.apache.parquet.statistics.RandomValues;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.*;
+
+public class TestParquetReaderRandomAccess {

Review comment:
       As far as I could understand you only check the first page of the same column for every row groups. I am not sure if it is enough or not but in this case the other columns are completely useless. I think, at least all the pages of the same column should be checked.
   
   Please, also test `readFilteredRowGroup`. As you do not decode the pages you do not need to test anything differently but we need to cover all new paths.

##########
File path: parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.*;
+import org.apache.parquet.statistics.DataGenerationContext;
+import org.apache.parquet.statistics.RandomValues;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.*;
+
+public class TestParquetReaderRandomAccess {
+  private static final int KILOBYTE = 1 << 10;
+  private static final long RANDOM_SEED = 7174252115631550700L;
+
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+
+  @Test
+  public void test() throws IOException {
+    Random random = new Random(RANDOM_SEED);
+
+    File file = temp.newFile("test_file.parquet");
+    file.delete();
+
+    int blockSize = 500 * KILOBYTE;
+    int pageSize = 20 * KILOBYTE;
+
+    List<DataContext> contexts = new ArrayList<>();
+
+    for (boolean enableDictionary : new boolean[]{false, true}) {
+      for (WriterVersion writerVersion : new WriterVersion[]{WriterVersion.PARQUET_1_0, WriterVersion.PARQUET_2_0}) {

Review comment:
       Of course it is not a problem but I think with/without dictionary and V1/V2 pages are irrelevant for this test




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [parquet-mr] shangxinli commented on pull request #871: PARQUET-1982: Random access to row groups in ParquetFileReader

Posted by GitBox <gi...@apache.org>.
shangxinli commented on pull request #871:
URL: https://github.com/apache/parquet-mr/pull/871#issuecomment-800333143


   Do you have some benchmarking test numbers to show how much gain we could have? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [parquet-mr] gszadovszky merged pull request #871: PARQUET-1982: Random access to row groups in ParquetFileReader

Posted by GitBox <gi...@apache.org>.
gszadovszky merged pull request #871:
URL: https://github.com/apache/parquet-mr/pull/871


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [parquet-mr] fschmalzel commented on a change in pull request #871: PARQUET-1982: Random access to row groups in ParquetFileReader

Posted by GitBox <gi...@apache.org>.
fschmalzel commented on a change in pull request #871:
URL: https://github.com/apache/parquet-mr/pull/871#discussion_r581859935



##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -920,22 +946,52 @@ public PageReadStore readNextRowGroup() throws IOException {
       }
     }
     // actually read all the chunks
-    ChunkListBuilder builder = new ChunkListBuilder();
+    ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
     for (ConsecutivePartList consecutiveChunks : allParts) {
       consecutiveChunks.readAll(f, builder);
     }
     for (Chunk chunk : builder.build()) {
-      readChunkPages(chunk, block);
+      readChunkPages(chunk, block, rowGroup);
     }
 
-    // avoid re-reading bytes the dictionary reader is used after this call
-    if (nextDictionaryReader != null) {
-      nextDictionaryReader.setRowGroup(currentRowGroup);
+    return rowGroup;
+  }
+
+  /**
+   * Reads all the columns requested from the specified row group. It may skip specific pages based on the column
+   * indexes according to the actual filter. As the rows are not aligned among the pages of the different columns row
+   * synchronization might be required. See the documentation of the class SynchronizingColumnReader for details.
+   *
+   * @param blockIndex the index of the requested block
+   * @return the PageReadStore which can provide PageReaders for each column or null if there are no rows in this block
+   * @throws IOException if an error occurs while reading
+   */
+  public PageReadStore readFilteredRowGroup(int blockIndex) throws IOException {

Review comment:
       I agree that using the index is cleaner. I added methods using int instead of BlockMetaData for existing API.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [parquet-mr] gszadovszky commented on pull request #871: PARQUET-1982: Random access to row groups in ParquetFileReader

Posted by GitBox <gi...@apache.org>.
gszadovszky commented on pull request #871:
URL: https://github.com/apache/parquet-mr/pull/871#issuecomment-819533194


   Thanks @shangxinli. I don't feel this change is a big deal as it does not change the current behavior but only adds new options to read row groups. I've already approved it so I'll merge it soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [parquet-mr] gszadovszky commented on pull request #871: PARQUET-1982: Random access to row groups in ParquetFileReader

Posted by GitBox <gi...@apache.org>.
gszadovszky commented on pull request #871:
URL: https://github.com/apache/parquet-mr/pull/871#issuecomment-819376709


   Thanks a lot for the performance test @fschmalzel!
   @shangxinli, any other comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #871: PARQUET-1982: Random access to row groups in ParquetFileReader

Posted by GitBox <gi...@apache.org>.
gszadovszky commented on a change in pull request #871:
URL: https://github.com/apache/parquet-mr/pull/871#discussion_r580973495



##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -920,22 +946,52 @@ public PageReadStore readNextRowGroup() throws IOException {
       }
     }
     // actually read all the chunks
-    ChunkListBuilder builder = new ChunkListBuilder();
+    ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
     for (ConsecutivePartList consecutiveChunks : allParts) {
       consecutiveChunks.readAll(f, builder);
     }
     for (Chunk chunk : builder.build()) {
-      readChunkPages(chunk, block);
+      readChunkPages(chunk, block, rowGroup);
     }
 
-    // avoid re-reading bytes the dictionary reader is used after this call
-    if (nextDictionaryReader != null) {
-      nextDictionaryReader.setRowGroup(currentRowGroup);
+    return rowGroup;
+  }
+
+  /**
+   * Reads all the columns requested from the specified row group. It may skip specific pages based on the column
+   * indexes according to the actual filter. As the rows are not aligned among the pages of the different columns row
+   * synchronization might be required. See the documentation of the class SynchronizingColumnReader for details.
+   *
+   * @param blockIndex the index of the requested block
+   * @return the PageReadStore which can provide PageReaders for each column or null if there are no rows in this block
+   * @throws IOException if an error occurs while reading
+   */
+  public PageReadStore readFilteredRowGroup(int blockIndex) throws IOException {

Review comment:
       It depends on how do you plan to use it. If you don't plan to cache the related metadata outside of the reader I think it is more clean to use the index because it does not suggest that you may use any arbitrary metadata but the ones are in the file. Meanwhile, you may want to select specific row groups to be read based on the metadata so you would already have the related object. In this case it is easier to simply pass it instead of checking for the index.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [parquet-mr] fschmalzel commented on pull request #871: PARQUET-1982: Random access to row groups in ParquetFileReader

Posted by GitBox <gi...@apache.org>.
fschmalzel commented on pull request #871:
URL: https://github.com/apache/parquet-mr/pull/871#issuecomment-818724064


   Tested random access with a 351 MB parquet file with zstd and dictionary enabled. The file has 321 row groups.
   Using the sequential method to read the randomized indexes took around 87319 milliseconds.
   Using the sequential method without this commit took around 86311 milliseconds.
   Using the random access method took around 58941 milliseconds.
   
   Note: This only improves randomized access. For sequential access both methods should take about the same time. Nevertheless the sequential method is not removed, so that should not be a problem.
   
   This should all be taken with a grain of salt, as this was tested without something like jmh. https://github.com/openjdk/jmh#java-microbenchmark-harness-jmh
   
   Unfortunately i cannot upload the tested file somewhere. If you want me to test a different file i will gladly do so.
   
   Quick and dirty test:
   
   ```java
   package org.apache.parquet;
   
   import org.apache.hadoop.conf.Configuration;
   import org.apache.hadoop.fs.Path;
   import org.apache.parquet.column.page.PageReadStore;
   import org.apache.parquet.example.data.Group;
   import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
   import org.apache.parquet.hadoop.ParquetFileReader;
   import org.apache.parquet.hadoop.util.HadoopInputFile;
   import org.apache.parquet.io.ColumnIOFactory;
   import org.apache.parquet.io.MessageColumnIO;
   import org.apache.parquet.io.RecordReader;
   import org.apache.parquet.schema.MessageType;
   import org.junit.Test;
   
   import java.io.IOException;
   import java.util.ArrayList;
   import java.util.Collections;
   import java.util.List;
   import java.util.Random;
   
   public class ReadTest {
   
     private static final long RANDOM_SEED = 7174252115631550700L;
     private static final String FILE_PATH = "C:\\Users\\fes\\Desktop\\test.parquet";
     private static final int ROUNDS = 4;
   
     @Test
     public void testSequential() throws IOException {
       Random random = new Random(RANDOM_SEED);
   
       Path file = new Path(FILE_PATH);
       Configuration configuration = new Configuration();
       ParquetReadOptions options = ParquetReadOptions.builder().build();
   
       ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(file, configuration), options);
       MessageType schema = reader.getFileMetaData().getSchema();
       MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
       GroupRecordConverter converter = new GroupRecordConverter(schema);
   
       int blockAmount = reader.getRowGroups().size();
   
       List<Integer> indexes = new ArrayList<>();
       for (int j = 0; j < ROUNDS; j++) {
         for (int i = 0; i < blockAmount; i++) {
           indexes.add(i);
         }
       }
   
       Collections.shuffle(indexes, random);
   
       long start = System.currentTimeMillis();
   
       int currentRowGroup = -1;
   
       for (int index : indexes) {
         if (index <= currentRowGroup) {
           try {
             reader.close();
           } catch (Exception ignored) {
           }
   
           reader = new ParquetFileReader(HadoopInputFile.fromPath(file, configuration), options);
           currentRowGroup = -1;
         }
         for (int i = 1; i < index - currentRowGroup; i++) {
           reader.skipNextRowGroup();
         }
         currentRowGroup = index;
         PageReadStore pages = reader.readNextRowGroup();
         long rowCount = pages.getRowCount();
         RecordReader<Group> recordReader = columnIO.getRecordReader(pages, converter);
         for (long i = 0; i < rowCount; i++) {
           recordReader.read();
         }
       }
   
       long stop = System.currentTimeMillis();
   
       try {
         reader.close();
       } catch (Exception ignored) {
       }
   
       long timeTaken = stop - start;
   
       System.out.printf("Sequential access took %d milliseconds%n", timeTaken);
     }
   
     @Test
     public void testRandom() throws IOException {
       Random random = new Random(RANDOM_SEED);
   
       Path file = new Path(FILE_PATH);
       Configuration configuration = new Configuration();
       ParquetReadOptions options = ParquetReadOptions.builder().build();
   
       ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(file, configuration), options);
       MessageType schema = reader.getFileMetaData().getSchema();
       MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
       GroupRecordConverter converter = new GroupRecordConverter(schema);
   
       int blockAmount = reader.getRowGroups().size();
   
       List<Integer> indexes = new ArrayList<>();
       for (int j = 0; j < ROUNDS; j++) {
         for (int i = 0; i < blockAmount; i++) {
           indexes.add(i);
         }
       }
   
       Collections.shuffle(indexes, random);
   
       long start = System.currentTimeMillis();
       for (int index : indexes) {
         PageReadStore pages = reader.readRowGroup(index);
         long rowCount = pages.getRowCount();
         RecordReader<Group> recordReader = columnIO.getRecordReader(pages, converter);
         for (long i = 0; i < rowCount; i++) {
           recordReader.read();
         }
       }
   
       long stop = System.currentTimeMillis();
   
       try {
         reader.close();
       } catch (Exception ignored) {
       }
   
       long timeTaken = stop - start;
   
       System.out.printf("Random access took %d milliseconds%n", timeTaken);
     }
     
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [parquet-mr] fschmalzel commented on a change in pull request #871: PARQUET-1982: Random access to row groups in ParquetFileReader

Posted by GitBox <gi...@apache.org>.
fschmalzel commented on a change in pull request #871:
URL: https://github.com/apache/parquet-mr/pull/871#discussion_r580961322



##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -920,22 +946,52 @@ public PageReadStore readNextRowGroup() throws IOException {
       }
     }
     // actually read all the chunks
-    ChunkListBuilder builder = new ChunkListBuilder();
+    ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
     for (ConsecutivePartList consecutiveChunks : allParts) {
       consecutiveChunks.readAll(f, builder);
     }
     for (Chunk chunk : builder.build()) {
-      readChunkPages(chunk, block);
+      readChunkPages(chunk, block, rowGroup);
     }
 
-    // avoid re-reading bytes the dictionary reader is used after this call
-    if (nextDictionaryReader != null) {
-      nextDictionaryReader.setRowGroup(currentRowGroup);
+    return rowGroup;
+  }
+
+  /**
+   * Reads all the columns requested from the specified row group. It may skip specific pages based on the column
+   * indexes according to the actual filter. As the rows are not aligned among the pages of the different columns row
+   * synchronization might be required. See the documentation of the class SynchronizingColumnReader for details.
+   *
+   * @param blockIndex the index of the requested block
+   * @return the PageReadStore which can provide PageReaders for each column or null if there are no rows in this block
+   * @throws IOException if an error occurs while reading
+   */
+  public PageReadStore readFilteredRowGroup(int blockIndex) throws IOException {

Review comment:
       I think using BlockMetaData is better as it is similar to the existing getDictionaryReader(BlockMetaData). For the filtered row group, we need the index to access the RowRanges and ColumnIndexStore.
   
   https://github.com/apache/parquet-mr/pull/871/commits/453a6cc9ef6bda9bd963fab05e59ab6f51389dfa#diff-8da24c84aef62e6e836d073938f7843d289785baaeddf446f3afeae6d4ef4b10R983
   
   https://github.com/apache/parquet-mr/pull/871/commits/453a6cc9ef6bda9bd963fab05e59ab6f51389dfa#diff-8da24c84aef62e6e836d073938f7843d289785baaeddf446f3afeae6d4ef4b10R994
   
   What do you think is the better approach?
   Accept BlockMetaData and find the index in the list.
   -or-
   Change the other method signatures to also use indexes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org