You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2019/12/21 22:15:52 UTC

[GitHub] [incubator-iceberg] samarthjain opened a new pull request #710: Parquet changes for vectorized reads

samarthjain opened a new pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710
 
 
   @rdblue, @prodeezy, @danielcweeks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361555670
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import java.util.Map;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * Interface for vectorized Iceberg readers.
+ */
+public interface VectorizedReader<T> {
+  T read();
+
+  void setRowGroupInfo(PageReadStore pages, DictionaryPageReadStore dictionaryPageReadStore,
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361028799
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
 ##########
 @@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.ParquetDictionaryRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetIO;
+import org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
+  private final InputFile input;
+  private final Schema expectedSchema;
+  private final ParquetReadOptions options;
+  private final Function<MessageType, VectorizedReader<T>> batchReaderFunc;
+  private final Expression filter;
+  private final boolean reuseContainers;
+  private final boolean caseSensitive;
+  private final int batchSize;
+
+  public VectorizedParquetReader(
+      InputFile input, Schema expectedSchema, ParquetReadOptions options,
+      Function<MessageType, VectorizedReader<T>> readerFunc,
+      Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
+    this.input = input;
+    this.expectedSchema = expectedSchema;
+    this.options = options;
+    this.batchReaderFunc = readerFunc;
+    // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
+    this.filter = filter == Expressions.alwaysTrue() ? null : filter;
+    this.reuseContainers = reuseContainers;
+    this.caseSensitive = caseSensitive;
+    this.batchSize = maxRecordsPerBatch;
+  }
+
+  private static class ReadConf<T> {
+    private final ParquetFileReader reader;
+    private final InputFile file;
+    private final ParquetReadOptions options;
+    private final MessageType projection;
+    private final VectorizedReader<T> model;
+    private final List<BlockMetaData> rowGroups;
+    private final boolean[] shouldSkip;
+    private final long totalValues;
+    private final boolean reuseContainers;
+    private final int batchSize;
+
+    @SuppressWarnings("unchecked")
+    ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+        Function<MessageType, VectorizedReader<T>> readerFunc, boolean reuseContainers,
+        boolean caseSensitive, int bSize) {
+      this.file = file;
+      this.options = options;
+      this.reader = newReader(file, options);
+
+      MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+      boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+      MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+      this.projection = hasIds ?
+          ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+          ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+      this.model = readerFunc.apply(typeWithIds);
+      this.rowGroups = reader.getRowGroups();
+      this.shouldSkip = new boolean[rowGroups.size()];
+
+      ParquetMetricsRowGroupFilter statsFilter = null;
+      ParquetDictionaryRowGroupFilter dictFilter = null;
+      if (filter != null) {
+        statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+        dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+      }
+
+      long computedTotalValues = 0L;
+      for (int i = 0; i < shouldSkip.length; i += 1) {
+        BlockMetaData rowGroup = rowGroups.get(i);
+        boolean shouldRead = filter == null || (
+            statsFilter.shouldRead(typeWithIds, rowGroup) &&
+                dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+        this.shouldSkip[i] = !shouldRead;
+        if (shouldRead) {
+          computedTotalValues += rowGroup.getRowCount();
+        }
+      }
+
+      this.totalValues = computedTotalValues;
+      this.reuseContainers = reuseContainers;
+      this.batchSize = bSize;
+    }
+
+    ReadConf(ReadConf<T> toCopy) {
+      this.reader = null;
+      this.file = toCopy.file;
+      this.options = toCopy.options;
+      this.projection = toCopy.projection;
+      this.model = toCopy.model;
+      this.rowGroups = toCopy.rowGroups;
+      this.shouldSkip = toCopy.shouldSkip;
+      this.totalValues = toCopy.totalValues;
+      this.reuseContainers = toCopy.reuseContainers;
+      this.batchSize = toCopy.batchSize;
+    }
+
+    ParquetFileReader reader() {
+      if (reader != null) {
+        reader.setRequestedSchema(projection);
+        return reader;
+      }
+
+      ParquetFileReader newReader = newReader(file, options);
+      newReader.setRequestedSchema(projection);
+      return newReader;
+    }
+
+    VectorizedReader model() {
+      return model;
+    }
+
+    boolean[] shouldSkip() {
+      return shouldSkip;
+    }
+
+    long totalValues() {
+      return totalValues;
+    }
+
+    boolean reuseContainers() {
+      return reuseContainers;
+    }
+
+    int batchSize() {
+      return batchSize;
+    }
+
+    private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
+      try {
+        return ParquetFileReader.open(ParquetIO.file(file), options);
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
+      }
+    }
+
+    ReadConf<T> copy() {
+      return new ReadConf<>(this);
+    }
+  }
+
+  private ReadConf conf = null;
+
+  private ReadConf init() {
+    if (conf == null) {
+      ReadConf readConf = new ReadConf(
+          input, options, expectedSchema, filter, batchReaderFunc, reuseContainers, caseSensitive, batchSize);
+      this.conf = readConf.copy();
+      return readConf;
+    }
+
+    return conf;
+  }
+
+  @Override
+  public Iterator iterator() {
+    FileIterator iter = new FileIterator(init());
+    addCloseable(iter);
+    return iter;
+  }
+
+  private static class FileIterator<T> implements Iterator<T>, Closeable {
+    private final ParquetFileReader reader;
+    private final boolean[] shouldSkip;
+    private final VectorizedReader<T> model;
+    private final long totalValues;
+    private final int batchSize;
+
+    private int nextRowGroup = 0;
+    private long nextRowGroupStart = 0;
+    private long valuesRead = 0;
+    private T last = null;
+
+    FileIterator(ReadConf conf) {
+      this.reader = conf.reader();
+      this.shouldSkip = conf.shouldSkip();
+      this.model = conf.model();
+      this.totalValues = conf.totalValues();
+      this.model.reuseContainers(conf.reuseContainers());
+      this.batchSize = conf.batchSize();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return valuesRead < totalValues;
+    }
+
+    @Override
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      if (valuesRead >= nextRowGroupStart) {
+        advance();
+      }
+      this.last = model.read();
+      valuesRead += Math.min(nextRowGroupStart - valuesRead, batchSize);
+      return last;
+    }
+
+    private void advance() {
+      while (shouldSkip[nextRowGroup]) {
+        nextRowGroup += 1;
+        reader.skipNextRowGroup();
+      }
+
+      PageReadStore pages;
+      DictionaryPageReadStore dictionaryPageReadStore;
+      try {
+        dictionaryPageReadStore = reader.getNextDictionaryReader();
+        pages = reader.readNextRowGroup();
+      } catch (IOException e) {
+        throw new RuntimeIOException(e);
+      }
+
+      nextRowGroupStart += pages.getRowCount();
+      nextRowGroup += 1;
+      model.setRowGroupInfo(
+          pages,
+          dictionaryPageReadStore,
+          dictionaryPageReadStore == null ? null : buildColumnDictEncodedMap(reader.getRowGroups()));
 
 Review comment:
   Because the map will be produced from metadata for all row groups, I think this should be set on the reader in the constructor, up where `reuseContainers` is called.
   
   And, if both the dictionary store and the map are removed, then the update here is the same as in the record read case and should match:
   
   ```java
     void setPageSource(PageReadStore pageStore);
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361555601
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
 ##########
 @@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.ParquetDictionaryRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetIO;
+import org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
+  private final InputFile input;
+  private final Schema expectedSchema;
+  private final ParquetReadOptions options;
+  private final Function<MessageType, VectorizedReader<T>> batchReaderFunc;
+  private final Expression filter;
+  private final boolean reuseContainers;
+  private final boolean caseSensitive;
+  private final int batchSize;
+
+  public VectorizedParquetReader(
+      InputFile input, Schema expectedSchema, ParquetReadOptions options,
+      Function<MessageType, VectorizedReader<T>> readerFunc,
+      Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
+    this.input = input;
+    this.expectedSchema = expectedSchema;
+    this.options = options;
+    this.batchReaderFunc = readerFunc;
+    // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
+    this.filter = filter == Expressions.alwaysTrue() ? null : filter;
+    this.reuseContainers = reuseContainers;
+    this.caseSensitive = caseSensitive;
+    this.batchSize = maxRecordsPerBatch;
+  }
+
+  private static class ReadConf<T> {
+    private final ParquetFileReader reader;
+    private final InputFile file;
+    private final ParquetReadOptions options;
+    private final MessageType projection;
+    private final VectorizedReader<T> model;
+    private final List<BlockMetaData> rowGroups;
+    private final boolean[] shouldSkip;
+    private final long totalValues;
+    private final boolean reuseContainers;
+    private final int batchSize;
+
+    @SuppressWarnings("unchecked")
+    ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+        Function<MessageType, VectorizedReader<T>> readerFunc, boolean reuseContainers,
+        boolean caseSensitive, int bSize) {
+      this.file = file;
+      this.options = options;
+      this.reader = newReader(file, options);
+
+      MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+      boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+      MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+      this.projection = hasIds ?
+          ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+          ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+      this.model = readerFunc.apply(typeWithIds);
+      this.rowGroups = reader.getRowGroups();
+      this.shouldSkip = new boolean[rowGroups.size()];
+
+      ParquetMetricsRowGroupFilter statsFilter = null;
+      ParquetDictionaryRowGroupFilter dictFilter = null;
+      if (filter != null) {
+        statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+        dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+      }
+
+      long computedTotalValues = 0L;
+      for (int i = 0; i < shouldSkip.length; i += 1) {
+        BlockMetaData rowGroup = rowGroups.get(i);
+        boolean shouldRead = filter == null || (
+            statsFilter.shouldRead(typeWithIds, rowGroup) &&
+                dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+        this.shouldSkip[i] = !shouldRead;
+        if (shouldRead) {
+          computedTotalValues += rowGroup.getRowCount();
+        }
+      }
+
+      this.totalValues = computedTotalValues;
+      this.reuseContainers = reuseContainers;
+      this.batchSize = bSize;
+    }
+
+    ReadConf(ReadConf<T> toCopy) {
+      this.reader = null;
+      this.file = toCopy.file;
+      this.options = toCopy.options;
+      this.projection = toCopy.projection;
+      this.model = toCopy.model;
+      this.rowGroups = toCopy.rowGroups;
+      this.shouldSkip = toCopy.shouldSkip;
+      this.totalValues = toCopy.totalValues;
+      this.reuseContainers = toCopy.reuseContainers;
+      this.batchSize = toCopy.batchSize;
+    }
+
+    ParquetFileReader reader() {
+      if (reader != null) {
+        reader.setRequestedSchema(projection);
+        return reader;
+      }
+
+      ParquetFileReader newReader = newReader(file, options);
+      newReader.setRequestedSchema(projection);
+      return newReader;
+    }
+
+    VectorizedReader model() {
+      return model;
+    }
+
+    boolean[] shouldSkip() {
+      return shouldSkip;
+    }
+
+    long totalValues() {
+      return totalValues;
+    }
+
+    boolean reuseContainers() {
+      return reuseContainers;
+    }
+
+    int batchSize() {
+      return batchSize;
+    }
+
+    private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
+      try {
+        return ParquetFileReader.open(ParquetIO.file(file), options);
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
+      }
+    }
+
+    ReadConf<T> copy() {
+      return new ReadConf<>(this);
+    }
+  }
+
+  private ReadConf conf = null;
+
+  private ReadConf init() {
+    if (conf == null) {
+      ReadConf readConf = new ReadConf(
+          input, options, expectedSchema, filter, batchReaderFunc, reuseContainers, caseSensitive, batchSize);
+      this.conf = readConf.copy();
+      return readConf;
+    }
+
+    return conf;
+  }
+
+  @Override
+  public Iterator iterator() {
+    FileIterator iter = new FileIterator(init());
+    addCloseable(iter);
+    return iter;
+  }
+
+  private static class FileIterator<T> implements Iterator<T>, Closeable {
+    private final ParquetFileReader reader;
+    private final boolean[] shouldSkip;
+    private final VectorizedReader<T> model;
+    private final long totalValues;
+    private final int batchSize;
+
+    private int nextRowGroup = 0;
+    private long nextRowGroupStart = 0;
+    private long valuesRead = 0;
+    private T last = null;
+
+    FileIterator(ReadConf conf) {
+      this.reader = conf.reader();
+      this.shouldSkip = conf.shouldSkip();
+      this.model = conf.model();
+      this.totalValues = conf.totalValues();
+      this.model.reuseContainers(conf.reuseContainers());
+      this.batchSize = conf.batchSize();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return valuesRead < totalValues;
+    }
+
+    @Override
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      if (valuesRead >= nextRowGroupStart) {
+        advance();
+      }
+      this.last = model.read();
+      valuesRead += Math.min(nextRowGroupStart - valuesRead, batchSize);
 
 Review comment:
   Made the change.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361028490
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
 ##########
 @@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.ParquetDictionaryRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetIO;
+import org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
+  private final InputFile input;
+  private final Schema expectedSchema;
+  private final ParquetReadOptions options;
+  private final Function<MessageType, VectorizedReader<T>> batchReaderFunc;
+  private final Expression filter;
+  private final boolean reuseContainers;
+  private final boolean caseSensitive;
+  private final int batchSize;
+
+  public VectorizedParquetReader(
+      InputFile input, Schema expectedSchema, ParquetReadOptions options,
+      Function<MessageType, VectorizedReader<T>> readerFunc,
+      Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
+    this.input = input;
+    this.expectedSchema = expectedSchema;
+    this.options = options;
+    this.batchReaderFunc = readerFunc;
+    // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
+    this.filter = filter == Expressions.alwaysTrue() ? null : filter;
+    this.reuseContainers = reuseContainers;
+    this.caseSensitive = caseSensitive;
+    this.batchSize = maxRecordsPerBatch;
+  }
+
+  private static class ReadConf<T> {
+    private final ParquetFileReader reader;
+    private final InputFile file;
+    private final ParquetReadOptions options;
+    private final MessageType projection;
+    private final VectorizedReader<T> model;
+    private final List<BlockMetaData> rowGroups;
+    private final boolean[] shouldSkip;
+    private final long totalValues;
+    private final boolean reuseContainers;
+    private final int batchSize;
+
+    @SuppressWarnings("unchecked")
+    ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+        Function<MessageType, VectorizedReader<T>> readerFunc, boolean reuseContainers,
+        boolean caseSensitive, int bSize) {
+      this.file = file;
+      this.options = options;
+      this.reader = newReader(file, options);
+
+      MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+      boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+      MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+      this.projection = hasIds ?
+          ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+          ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+      this.model = readerFunc.apply(typeWithIds);
+      this.rowGroups = reader.getRowGroups();
+      this.shouldSkip = new boolean[rowGroups.size()];
+
+      ParquetMetricsRowGroupFilter statsFilter = null;
+      ParquetDictionaryRowGroupFilter dictFilter = null;
+      if (filter != null) {
+        statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+        dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+      }
+
+      long computedTotalValues = 0L;
+      for (int i = 0; i < shouldSkip.length; i += 1) {
+        BlockMetaData rowGroup = rowGroups.get(i);
+        boolean shouldRead = filter == null || (
+            statsFilter.shouldRead(typeWithIds, rowGroup) &&
+                dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+        this.shouldSkip[i] = !shouldRead;
+        if (shouldRead) {
+          computedTotalValues += rowGroup.getRowCount();
+        }
+      }
+
+      this.totalValues = computedTotalValues;
+      this.reuseContainers = reuseContainers;
+      this.batchSize = bSize;
+    }
+
+    ReadConf(ReadConf<T> toCopy) {
+      this.reader = null;
+      this.file = toCopy.file;
+      this.options = toCopy.options;
+      this.projection = toCopy.projection;
+      this.model = toCopy.model;
+      this.rowGroups = toCopy.rowGroups;
+      this.shouldSkip = toCopy.shouldSkip;
+      this.totalValues = toCopy.totalValues;
+      this.reuseContainers = toCopy.reuseContainers;
+      this.batchSize = toCopy.batchSize;
+    }
+
+    ParquetFileReader reader() {
+      if (reader != null) {
+        reader.setRequestedSchema(projection);
+        return reader;
+      }
+
+      ParquetFileReader newReader = newReader(file, options);
+      newReader.setRequestedSchema(projection);
+      return newReader;
+    }
+
+    VectorizedReader model() {
+      return model;
+    }
+
+    boolean[] shouldSkip() {
+      return shouldSkip;
+    }
+
+    long totalValues() {
+      return totalValues;
+    }
+
+    boolean reuseContainers() {
+      return reuseContainers;
+    }
+
+    int batchSize() {
+      return batchSize;
+    }
+
+    private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
+      try {
+        return ParquetFileReader.open(ParquetIO.file(file), options);
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
+      }
+    }
+
+    ReadConf<T> copy() {
+      return new ReadConf<>(this);
+    }
+  }
+
+  private ReadConf conf = null;
+
+  private ReadConf init() {
+    if (conf == null) {
+      ReadConf readConf = new ReadConf(
+          input, options, expectedSchema, filter, batchReaderFunc, reuseContainers, caseSensitive, batchSize);
+      this.conf = readConf.copy();
+      return readConf;
+    }
+
+    return conf;
+  }
+
+  @Override
+  public Iterator iterator() {
+    FileIterator iter = new FileIterator(init());
+    addCloseable(iter);
+    return iter;
+  }
+
+  private static class FileIterator<T> implements Iterator<T>, Closeable {
+    private final ParquetFileReader reader;
+    private final boolean[] shouldSkip;
+    private final VectorizedReader<T> model;
+    private final long totalValues;
+    private final int batchSize;
+
+    private int nextRowGroup = 0;
+    private long nextRowGroupStart = 0;
+    private long valuesRead = 0;
+    private T last = null;
+
+    FileIterator(ReadConf conf) {
+      this.reader = conf.reader();
+      this.shouldSkip = conf.shouldSkip();
+      this.model = conf.model();
+      this.totalValues = conf.totalValues();
+      this.model.reuseContainers(conf.reuseContainers());
+      this.batchSize = conf.batchSize();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return valuesRead < totalValues;
+    }
+
+    @Override
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      if (valuesRead >= nextRowGroupStart) {
+        advance();
+      }
+      this.last = model.read();
+      valuesRead += Math.min(nextRowGroupStart - valuesRead, batchSize);
+      return last;
+    }
+
+    private void advance() {
+      while (shouldSkip[nextRowGroup]) {
+        nextRowGroup += 1;
+        reader.skipNextRowGroup();
+      }
+
+      PageReadStore pages;
+      DictionaryPageReadStore dictionaryPageReadStore;
+      try {
+        dictionaryPageReadStore = reader.getNextDictionaryReader();
+        pages = reader.readNextRowGroup();
+      } catch (IOException e) {
+        throw new RuntimeIOException(e);
+      }
+
+      nextRowGroupStart += pages.getRowCount();
+      nextRowGroup += 1;
+      model.setRowGroupInfo(
+          pages,
+          dictionaryPageReadStore,
+          dictionaryPageReadStore == null ? null : buildColumnDictEncodedMap(reader.getRowGroups()));
+    }
+
+    /**
+     * Retuns a map of {@link ColumnPath} -> whether all the pages in the row group for this column are dictionary
+     * encoded
+     */
+    private static Map<ColumnPath, Boolean> buildColumnDictEncodedMap(List<BlockMetaData> blockMetaData) {
+      Map<ColumnPath, Boolean> map = new HashMap<>();
+      for (BlockMetaData b : blockMetaData) {
+        for (ColumnChunkMetaData c : b.getColumns()) {
+          map.put(c.getPath(), !ParquetUtil.hasNonDictionaryPages(c));
+        }
+      }
+      return map;
 
 Review comment:
   The effect of this code is to use the result of `hasNonDictionaryPages` for just the last block because the value is replaced each time. I think this should AND the results together so that if any row group has non-dictionary pages, all batches are eagerly decoded.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue merged pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361025438
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
 ##########
 @@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.ParquetDictionaryRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetIO;
+import org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
+  private final InputFile input;
+  private final Schema expectedSchema;
+  private final ParquetReadOptions options;
+  private final Function<MessageType, VectorizedReader<T>> batchReaderFunc;
+  private final Expression filter;
+  private final boolean reuseContainers;
+  private final boolean caseSensitive;
+  private final int batchSize;
+
+  public VectorizedParquetReader(
+      InputFile input, Schema expectedSchema, ParquetReadOptions options,
+      Function<MessageType, VectorizedReader<T>> readerFunc,
+      Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
+    this.input = input;
+    this.expectedSchema = expectedSchema;
+    this.options = options;
+    this.batchReaderFunc = readerFunc;
+    // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
+    this.filter = filter == Expressions.alwaysTrue() ? null : filter;
+    this.reuseContainers = reuseContainers;
+    this.caseSensitive = caseSensitive;
+    this.batchSize = maxRecordsPerBatch;
+  }
+
+  private static class ReadConf<T> {
+    private final ParquetFileReader reader;
+    private final InputFile file;
+    private final ParquetReadOptions options;
+    private final MessageType projection;
+    private final VectorizedReader<T> model;
+    private final List<BlockMetaData> rowGroups;
+    private final boolean[] shouldSkip;
+    private final long totalValues;
+    private final boolean reuseContainers;
+    private final int batchSize;
+
+    @SuppressWarnings("unchecked")
+    ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+        Function<MessageType, VectorizedReader<T>> readerFunc, boolean reuseContainers,
+        boolean caseSensitive, int bSize) {
+      this.file = file;
+      this.options = options;
+      this.reader = newReader(file, options);
+
+      MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+      boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+      MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+      this.projection = hasIds ?
+          ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+          ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+      this.model = readerFunc.apply(typeWithIds);
+      this.rowGroups = reader.getRowGroups();
+      this.shouldSkip = new boolean[rowGroups.size()];
+
+      ParquetMetricsRowGroupFilter statsFilter = null;
+      ParquetDictionaryRowGroupFilter dictFilter = null;
+      if (filter != null) {
+        statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+        dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+      }
+
+      long computedTotalValues = 0L;
+      for (int i = 0; i < shouldSkip.length; i += 1) {
+        BlockMetaData rowGroup = rowGroups.get(i);
+        boolean shouldRead = filter == null || (
+            statsFilter.shouldRead(typeWithIds, rowGroup) &&
+                dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+        this.shouldSkip[i] = !shouldRead;
+        if (shouldRead) {
+          computedTotalValues += rowGroup.getRowCount();
+        }
+      }
+
+      this.totalValues = computedTotalValues;
+      this.reuseContainers = reuseContainers;
+      this.batchSize = bSize;
+    }
+
+    ReadConf(ReadConf<T> toCopy) {
+      this.reader = null;
+      this.file = toCopy.file;
+      this.options = toCopy.options;
+      this.projection = toCopy.projection;
+      this.model = toCopy.model;
+      this.rowGroups = toCopy.rowGroups;
+      this.shouldSkip = toCopy.shouldSkip;
+      this.totalValues = toCopy.totalValues;
+      this.reuseContainers = toCopy.reuseContainers;
+      this.batchSize = toCopy.batchSize;
+    }
+
+    ParquetFileReader reader() {
+      if (reader != null) {
+        reader.setRequestedSchema(projection);
+        return reader;
+      }
+
+      ParquetFileReader newReader = newReader(file, options);
+      newReader.setRequestedSchema(projection);
+      return newReader;
+    }
+
+    VectorizedReader model() {
+      return model;
+    }
+
+    boolean[] shouldSkip() {
+      return shouldSkip;
+    }
+
+    long totalValues() {
+      return totalValues;
+    }
+
+    boolean reuseContainers() {
+      return reuseContainers;
+    }
+
+    int batchSize() {
+      return batchSize;
+    }
+
+    private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
+      try {
+        return ParquetFileReader.open(ParquetIO.file(file), options);
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
+      }
+    }
+
+    ReadConf<T> copy() {
+      return new ReadConf<>(this);
+    }
+  }
+
+  private ReadConf conf = null;
+
+  private ReadConf init() {
+    if (conf == null) {
+      ReadConf readConf = new ReadConf(
+          input, options, expectedSchema, filter, batchReaderFunc, reuseContainers, caseSensitive, batchSize);
+      this.conf = readConf.copy();
+      return readConf;
+    }
+
+    return conf;
+  }
+
+  @Override
+  public Iterator iterator() {
+    FileIterator iter = new FileIterator(init());
+    addCloseable(iter);
+    return iter;
+  }
+
+  private static class FileIterator<T> implements Iterator<T>, Closeable {
+    private final ParquetFileReader reader;
+    private final boolean[] shouldSkip;
+    private final VectorizedReader<T> model;
+    private final long totalValues;
+    private final int batchSize;
+
+    private int nextRowGroup = 0;
+    private long nextRowGroupStart = 0;
+    private long valuesRead = 0;
+    private T last = null;
+
+    FileIterator(ReadConf conf) {
+      this.reader = conf.reader();
+      this.shouldSkip = conf.shouldSkip();
+      this.model = conf.model();
+      this.totalValues = conf.totalValues();
+      this.model.reuseContainers(conf.reuseContainers());
+      this.batchSize = conf.batchSize();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return valuesRead < totalValues;
+    }
+
+    @Override
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      if (valuesRead >= nextRowGroupStart) {
+        advance();
+      }
+      this.last = model.read();
+      valuesRead += Math.min(nextRowGroupStart - valuesRead, batchSize);
 
 Review comment:
   This assumes that the number of rows read matches the number of rows expected, instead of using the actual number of rows read or requesting the model read some number of rows. I think this should probably pass in the required row count to ensure the accounting doesn't diverge.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361019971
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import java.util.Map;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * Interface for vectorized Iceberg readers.
+ */
+public interface VectorizedReader<T> {
+  T read();
+
+  void setRowGroupInfo(PageReadStore pages, DictionaryPageReadStore dictionaryPageReadStore,
 
 Review comment:
   `PageReadStore` is already a `DictionaryPageReadStore`. The implementation, `ColumnChunkPageReadStore` implements both, and the implementation to get a dictionary page just reuses the `PageReadStore.getPageReader` method:
   
   ```java
     @Override
     public DictionaryPage readDictionaryPage(ColumnDescriptor descriptor) {
       return getPageReader(descriptor).readDictionaryPage();
     }
   ```
   
   I think you can remove the `DictionaryPageReadStore` arg.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361722884
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java
 ##########
 @@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.Map;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * Interface for vectorized Iceberg readers.
+ */
 
 Review comment:
   This should have `@param <T>` to document `T`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361723414
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
 ##########
 @@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
+  private final InputFile input;
+  private final Schema expectedSchema;
+  private final ParquetReadOptions options;
+  private final Function<MessageType, VectorizedReader<?>> batchReaderFunc;
+  private final Expression filter;
+  private boolean reuseContainers;
+  private final boolean caseSensitive;
+  private final int batchSize;
+
+  public VectorizedParquetReader(
+      InputFile input, Schema expectedSchema, ParquetReadOptions options,
+      Function<MessageType, VectorizedReader<?>> readerFunc,
+      Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
+    this.input = input;
+    this.expectedSchema = expectedSchema;
+    this.options = options;
+    this.batchReaderFunc = readerFunc;
+    // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
+    this.filter = filter == Expressions.alwaysTrue() ? null : filter;
+    this.reuseContainers = reuseContainers;
+    this.caseSensitive = caseSensitive;
+    this.batchSize = maxRecordsPerBatch;
+  }
+
+  private ReadConf conf = null;
+
+  private ReadConf init() {
+    if (conf == null) {
+      ReadConf readConf = new ReadConf(
+          input, options, expectedSchema, filter, null, batchReaderFunc, reuseContainers, caseSensitive, batchSize);
+      this.conf = readConf.copy();
+      return readConf;
+    }
+    return conf;
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    FileIterator<T> iter = new FileIterator<>(init());
+    addCloseable(iter);
+    return iter;
+  }
+
+  private static class FileIterator<T> implements Iterator<T>, Closeable {
+    private final ParquetFileReader reader;
+    private final boolean[] shouldSkip;
+    private final VectorizedReader<T> model;
+    private final long totalValues;
+    private final int batchSize;
+    private final Map<ColumnPath, Boolean> columnDictEncodedMap;
+    private int nextRowGroup = 0;
+    private long nextRowGroupStart = 0;
+    private long valuesRead = 0;
+    private T last = null;
+
+    FileIterator(ReadConf conf) {
+      this.reader = conf.reader();
+      this.shouldSkip = conf.shouldSkip();
+      this.model = conf.vectorizedModel();
+      this.totalValues = conf.totalValues();
+      this.model.reuseContainers(conf.reuseContainers());
+      this.batchSize = conf.batchSize();
+      this.columnDictEncodedMap = conf.columnDictEncodedMap();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return valuesRead < totalValues;
+    }
+
+    @Override
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      if (valuesRead >= nextRowGroupStart) {
+        advance();
+      }
+      long numValuesToRead = Math.min(nextRowGroupStart - valuesRead, batchSize);
+      // batchSize is an integer, so casting to integer is safe
+      this.last = model.read((int) numValuesToRead);
+      valuesRead += numValuesToRead;
+      return last;
+    }
+
+    private void advance() {
+      while (shouldSkip[nextRowGroup]) {
+        nextRowGroup += 1;
+        reader.skipNextRowGroup();
+      }
+
+      PageReadStore pages;
+      DictionaryPageReadStore dictionaryPageReadStore;
+      try {
+        dictionaryPageReadStore = reader.getNextDictionaryReader();
 
 Review comment:
   Looks like this isn't used anymore.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361020814
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
 ##########
 @@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.ParquetDictionaryRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetIO;
+import org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
+  private final InputFile input;
+  private final Schema expectedSchema;
+  private final ParquetReadOptions options;
+  private final Function<MessageType, VectorizedReader<T>> batchReaderFunc;
+  private final Expression filter;
+  private final boolean reuseContainers;
+  private final boolean caseSensitive;
+  private final int batchSize;
+
+  public VectorizedParquetReader(
+      InputFile input, Schema expectedSchema, ParquetReadOptions options,
+      Function<MessageType, VectorizedReader<T>> readerFunc,
+      Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
+    this.input = input;
+    this.expectedSchema = expectedSchema;
+    this.options = options;
+    this.batchReaderFunc = readerFunc;
+    // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
+    this.filter = filter == Expressions.alwaysTrue() ? null : filter;
+    this.reuseContainers = reuseContainers;
+    this.caseSensitive = caseSensitive;
+    this.batchSize = maxRecordsPerBatch;
+  }
+
+  private static class ReadConf<T> {
+    private final ParquetFileReader reader;
+    private final InputFile file;
+    private final ParquetReadOptions options;
+    private final MessageType projection;
+    private final VectorizedReader<T> model;
+    private final List<BlockMetaData> rowGroups;
+    private final boolean[] shouldSkip;
+    private final long totalValues;
+    private final boolean reuseContainers;
+    private final int batchSize;
+
+    @SuppressWarnings("unchecked")
+    ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+        Function<MessageType, VectorizedReader<T>> readerFunc, boolean reuseContainers,
+        boolean caseSensitive, int bSize) {
+      this.file = file;
+      this.options = options;
+      this.reader = newReader(file, options);
+
+      MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+      boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+      MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+      this.projection = hasIds ?
+          ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+          ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+      this.model = readerFunc.apply(typeWithIds);
+      this.rowGroups = reader.getRowGroups();
+      this.shouldSkip = new boolean[rowGroups.size()];
+
+      ParquetMetricsRowGroupFilter statsFilter = null;
+      ParquetDictionaryRowGroupFilter dictFilter = null;
+      if (filter != null) {
+        statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+        dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+      }
+
+      long computedTotalValues = 0L;
+      for (int i = 0; i < shouldSkip.length; i += 1) {
+        BlockMetaData rowGroup = rowGroups.get(i);
+        boolean shouldRead = filter == null || (
+            statsFilter.shouldRead(typeWithIds, rowGroup) &&
+                dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+        this.shouldSkip[i] = !shouldRead;
+        if (shouldRead) {
+          computedTotalValues += rowGroup.getRowCount();
+        }
+      }
+
+      this.totalValues = computedTotalValues;
+      this.reuseContainers = reuseContainers;
+      this.batchSize = bSize;
+    }
+
+    ReadConf(ReadConf<T> toCopy) {
+      this.reader = null;
+      this.file = toCopy.file;
+      this.options = toCopy.options;
+      this.projection = toCopy.projection;
+      this.model = toCopy.model;
+      this.rowGroups = toCopy.rowGroups;
+      this.shouldSkip = toCopy.shouldSkip;
+      this.totalValues = toCopy.totalValues;
+      this.reuseContainers = toCopy.reuseContainers;
+      this.batchSize = toCopy.batchSize;
+    }
+
+    ParquetFileReader reader() {
+      if (reader != null) {
+        reader.setRequestedSchema(projection);
+        return reader;
+      }
+
+      ParquetFileReader newReader = newReader(file, options);
+      newReader.setRequestedSchema(projection);
+      return newReader;
+    }
+
+    VectorizedReader model() {
+      return model;
+    }
+
+    boolean[] shouldSkip() {
+      return shouldSkip;
+    }
+
+    long totalValues() {
+      return totalValues;
+    }
+
+    boolean reuseContainers() {
+      return reuseContainers;
+    }
+
+    int batchSize() {
+      return batchSize;
+    }
+
+    private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
+      try {
+        return ParquetFileReader.open(ParquetIO.file(file), options);
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
+      }
+    }
+
+    ReadConf<T> copy() {
+      return new ReadConf<>(this);
+    }
+  }
+
+  private ReadConf conf = null;
+
+  private ReadConf init() {
+    if (conf == null) {
+      ReadConf readConf = new ReadConf(
+          input, options, expectedSchema, filter, batchReaderFunc, reuseContainers, caseSensitive, batchSize);
+      this.conf = readConf.copy();
+      return readConf;
+    }
+
+    return conf;
+  }
+
+  @Override
+  public Iterator iterator() {
 
 Review comment:
   Can you check types? I think this should be `Iterator<T>`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361723981
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
 ##########
 @@ -41,11 +41,11 @@
 /**
  * Methods in this class translate from the IO API to Parquet's IO API.
  */
-class ParquetIO {
+public class ParquetIO {
 
 Review comment:
   Why do these methods need to be public? These should only be used to convert Iceberg's `InputFile` and `OutputFile` interfaces to versions that can be used with Parquet. Iceberg should keep these internal because all the external APIs should use Iceberg's interfaces.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361555615
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
 ##########
 @@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.ParquetDictionaryRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetIO;
+import org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
+  private final InputFile input;
+  private final Schema expectedSchema;
+  private final ParquetReadOptions options;
+  private final Function<MessageType, VectorizedReader<T>> batchReaderFunc;
+  private final Expression filter;
+  private final boolean reuseContainers;
+  private final boolean caseSensitive;
+  private final int batchSize;
+
+  public VectorizedParquetReader(
+      InputFile input, Schema expectedSchema, ParquetReadOptions options,
+      Function<MessageType, VectorizedReader<T>> readerFunc,
+      Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
+    this.input = input;
+    this.expectedSchema = expectedSchema;
+    this.options = options;
+    this.batchReaderFunc = readerFunc;
+    // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
+    this.filter = filter == Expressions.alwaysTrue() ? null : filter;
+    this.reuseContainers = reuseContainers;
+    this.caseSensitive = caseSensitive;
+    this.batchSize = maxRecordsPerBatch;
+  }
+
+  private static class ReadConf<T> {
+    private final ParquetFileReader reader;
+    private final InputFile file;
+    private final ParquetReadOptions options;
+    private final MessageType projection;
+    private final VectorizedReader<T> model;
+    private final List<BlockMetaData> rowGroups;
+    private final boolean[] shouldSkip;
+    private final long totalValues;
+    private final boolean reuseContainers;
+    private final int batchSize;
+
+    @SuppressWarnings("unchecked")
+    ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+        Function<MessageType, VectorizedReader<T>> readerFunc, boolean reuseContainers,
+        boolean caseSensitive, int bSize) {
+      this.file = file;
+      this.options = options;
+      this.reader = newReader(file, options);
+
+      MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+      boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+      MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+      this.projection = hasIds ?
+          ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+          ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+      this.model = readerFunc.apply(typeWithIds);
+      this.rowGroups = reader.getRowGroups();
+      this.shouldSkip = new boolean[rowGroups.size()];
+
+      ParquetMetricsRowGroupFilter statsFilter = null;
+      ParquetDictionaryRowGroupFilter dictFilter = null;
+      if (filter != null) {
+        statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+        dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+      }
+
+      long computedTotalValues = 0L;
+      for (int i = 0; i < shouldSkip.length; i += 1) {
+        BlockMetaData rowGroup = rowGroups.get(i);
+        boolean shouldRead = filter == null || (
+            statsFilter.shouldRead(typeWithIds, rowGroup) &&
+                dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+        this.shouldSkip[i] = !shouldRead;
+        if (shouldRead) {
+          computedTotalValues += rowGroup.getRowCount();
+        }
+      }
+
+      this.totalValues = computedTotalValues;
+      this.reuseContainers = reuseContainers;
+      this.batchSize = bSize;
+    }
+
+    ReadConf(ReadConf<T> toCopy) {
+      this.reader = null;
+      this.file = toCopy.file;
+      this.options = toCopy.options;
+      this.projection = toCopy.projection;
+      this.model = toCopy.model;
+      this.rowGroups = toCopy.rowGroups;
+      this.shouldSkip = toCopy.shouldSkip;
+      this.totalValues = toCopy.totalValues;
+      this.reuseContainers = toCopy.reuseContainers;
+      this.batchSize = toCopy.batchSize;
+    }
+
+    ParquetFileReader reader() {
+      if (reader != null) {
+        reader.setRequestedSchema(projection);
+        return reader;
+      }
+
+      ParquetFileReader newReader = newReader(file, options);
+      newReader.setRequestedSchema(projection);
+      return newReader;
+    }
+
+    VectorizedReader model() {
+      return model;
+    }
+
+    boolean[] shouldSkip() {
+      return shouldSkip;
+    }
+
+    long totalValues() {
+      return totalValues;
+    }
+
+    boolean reuseContainers() {
+      return reuseContainers;
+    }
+
+    int batchSize() {
+      return batchSize;
+    }
+
+    private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
+      try {
+        return ParquetFileReader.open(ParquetIO.file(file), options);
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
+      }
+    }
+
+    ReadConf<T> copy() {
+      return new ReadConf<>(this);
+    }
+  }
+
+  private ReadConf conf = null;
+
+  private ReadConf init() {
+    if (conf == null) {
+      ReadConf readConf = new ReadConf(
+          input, options, expectedSchema, filter, batchReaderFunc, reuseContainers, caseSensitive, batchSize);
+      this.conf = readConf.copy();
+      return readConf;
+    }
+
+    return conf;
+  }
+
+  @Override
+  public Iterator iterator() {
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361555678
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
 ##########
 @@ -362,6 +367,16 @@ public ReadBuilder createReaderFunc(Function<MessageType, ParquetValueReader<?>>
       return this;
     }
 
+    public ReadBuilder createBatchedReaderFunc(Function<MessageType, VectorizedReader<?>> func) {
+      this.batchedReaderFunc = func;
+      return this;
+    }
+
+    public ReadBuilder enableBatchedRead() {
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361555687
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
 ##########
 @@ -456,6 +483,8 @@ public ReadBuilder reuseContainers() {
 
       return new ParquetIterable<>(builder);
     }
+
+
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361723648
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
 ##########
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.InputFile;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Configuration for Parquet readers.
+ *
+ * @param <T> type of value to read
+ */
+class ReadConf<T> {
+  private final ParquetFileReader reader;
+  private final InputFile file;
+  private final ParquetReadOptions options;
+  private final MessageType projection;
+  @Nullable
+  private final ParquetValueReader<T> model;
+  @Nullable
+  private final VectorizedReader<T> vectorizedModel;
+  private final List<BlockMetaData> rowGroups;
+  private final boolean[] shouldSkip;
+  private final long totalValues;
+  private final boolean reuseContainers;
+  @Nullable
+  private final Integer batchSize;
+
+  // This maps tracks whether all the pages of all the row groups of a column are dictionary encoded
+  private final Map<ColumnPath, Boolean> columnDictEncodedMap;
+
+  @SuppressWarnings("unchecked")
+  ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+           Function<MessageType, ParquetValueReader<?>> readerFunc, Function<MessageType,
+           VectorizedReader<?>> batchedReaderFunc, boolean reuseContainers,
+           boolean caseSensitive, Integer bSize) {
+    this.file = file;
+    this.options = options;
+    this.reader = newReader(file, options);
+    MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+    boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+    MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+    this.projection = hasIds ?
+        ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+        ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+    this.rowGroups = reader.getRowGroups();
+    this.shouldSkip = new boolean[rowGroups.size()];
+    ParquetMetricsRowGroupFilter statsFilter = null;
+    ParquetDictionaryRowGroupFilter dictFilter = null;
+    if (filter != null) {
+      statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+      dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+    }
+    long computedTotalValues = 0L;
+    for (int i = 0; i < shouldSkip.length; i += 1) {
+      BlockMetaData rowGroup = rowGroups.get(i);
+      boolean shouldRead = filter == null || (
+          statsFilter.shouldRead(typeWithIds, rowGroup) &&
+              dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+      this.shouldSkip[i] = !shouldRead;
+      if (shouldRead) {
+        computedTotalValues += rowGroup.getRowCount();
+      }
+    }
+    this.totalValues = computedTotalValues;
 
 Review comment:
   Looks like the original spacing was removed when this was copied. Can you add newlines back between distinct code sections? I think it's easier to read that way.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361020028
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import java.util.Map;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * Interface for vectorized Iceberg readers.
+ */
+public interface VectorizedReader<T> {
+  T read();
 
 Review comment:
   These methods could use some simple documentation. Mostly for the arguments, like `columnPathBooleanMap`. What is that?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361555652
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import java.util.Map;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * Interface for vectorized Iceberg readers.
+ */
+public interface VectorizedReader<T> {
+  T read();
 
 Review comment:
   Added docs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361722003
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
 ##########
 @@ -377,9 +384,15 @@ public ReadBuilder reuseContainers() {
       return this;
     }
 
-    @SuppressWarnings("unchecked")
+    public ReadBuilder recordsPerBatch(int numRowsPerBatch) {
+
 
 Review comment:
   Nit: we don't typically add newlines at the start of methods.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361723517
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
 ##########
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.InputFile;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Configuration for Parquet readers.
+ *
+ * @param <T> type of value to read
+ */
+class ReadConf<T> {
+  private final ParquetFileReader reader;
+  private final InputFile file;
+  private final ParquetReadOptions options;
+  private final MessageType projection;
+  @Nullable
+  private final ParquetValueReader<T> model;
+  @Nullable
+  private final VectorizedReader<T> vectorizedModel;
+  private final List<BlockMetaData> rowGroups;
+  private final boolean[] shouldSkip;
+  private final long totalValues;
+  private final boolean reuseContainers;
+  @Nullable
+  private final Integer batchSize;
+
+  // This maps tracks whether all the pages of all the row groups of a column are dictionary encoded
+  private final Map<ColumnPath, Boolean> columnDictEncodedMap;
+
+  @SuppressWarnings("unchecked")
+  ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+           Function<MessageType, ParquetValueReader<?>> readerFunc, Function<MessageType,
+           VectorizedReader<?>> batchedReaderFunc, boolean reuseContainers,
+           boolean caseSensitive, Integer bSize) {
+    this.file = file;
+    this.options = options;
+    this.reader = newReader(file, options);
+    MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+    boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+    MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+    this.projection = hasIds ?
+        ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+        ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+    this.rowGroups = reader.getRowGroups();
+    this.shouldSkip = new boolean[rowGroups.size()];
+    ParquetMetricsRowGroupFilter statsFilter = null;
+    ParquetDictionaryRowGroupFilter dictFilter = null;
+    if (filter != null) {
+      statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+      dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+    }
+    long computedTotalValues = 0L;
+    for (int i = 0; i < shouldSkip.length; i += 1) {
+      BlockMetaData rowGroup = rowGroups.get(i);
+      boolean shouldRead = filter == null || (
+          statsFilter.shouldRead(typeWithIds, rowGroup) &&
+              dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+      this.shouldSkip[i] = !shouldRead;
+      if (shouldRead) {
+        computedTotalValues += rowGroup.getRowCount();
+      }
+    }
+    this.totalValues = computedTotalValues;
+    if (readerFunc != null) {
+      this.model = (ParquetValueReader<T>) readerFunc.apply(typeWithIds);
+      this.vectorizedModel = null;
+      this.columnDictEncodedMap = null;
+    } else {
+      this.model = null;
+      this.vectorizedModel = (VectorizedReader<T>) batchedReaderFunc.apply(typeWithIds);
+      this.columnDictEncodedMap = buildColumnDictEncodedMap();
+    }
+    this.reuseContainers = reuseContainers;
+    this.batchSize = bSize;
+  }
+
+  ReadConf(ReadConf<T> toCopy) {
 
 Review comment:
   Can this be private?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361759771
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
 ##########
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.InputFile;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Configuration for Parquet readers.
+ *
+ * @param <T> type of value to read
+ */
+class ReadConf<T> {
+  private final ParquetFileReader reader;
+  private final InputFile file;
+  private final ParquetReadOptions options;
+  private final MessageType projection;
+  @Nullable
+  private final ParquetValueReader<T> model;
+  @Nullable
+  private final VectorizedReader<T> vectorizedModel;
+  private final List<BlockMetaData> rowGroups;
+  private final boolean[] shouldSkip;
+  private final long totalValues;
+  private final boolean reuseContainers;
+  @Nullable
+  private final Integer batchSize;
+
+  // Indexed by row group with nulls for row groups that are skipped
+  private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetaDataForRowGroups;
+
+  @SuppressWarnings("unchecked")
+  ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+           Function<MessageType, ParquetValueReader<?>> readerFunc, Function<MessageType,
+           VectorizedReader<?>> batchedReaderFunc, boolean reuseContainers,
+           boolean caseSensitive, Integer bSize) {
+    this.file = file;
+    this.options = options;
+    this.reader = newReader(file, options);
+    MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+    boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+    MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+    this.projection = hasIds ?
+        ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+        ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+    this.rowGroups = reader.getRowGroups();
+    this.shouldSkip = new boolean[rowGroups.size()];
+
+    ParquetMetricsRowGroupFilter statsFilter = null;
+    ParquetDictionaryRowGroupFilter dictFilter = null;
+    if (filter != null) {
+      statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+      dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+    }
+
+    long computedTotalValues = 0L;
+    for (int i = 0; i < shouldSkip.length; i += 1) {
+      BlockMetaData rowGroup = rowGroups.get(i);
+      boolean shouldRead = filter == null || (
+          statsFilter.shouldRead(typeWithIds, rowGroup) &&
+              dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+      this.shouldSkip[i] = !shouldRead;
+      if (shouldRead) {
+        computedTotalValues += rowGroup.getRowCount();
+      }
+    }
+
+    this.totalValues = computedTotalValues;
+    if (readerFunc != null) {
+      this.model = (ParquetValueReader<T>) readerFunc.apply(typeWithIds);
+      this.vectorizedModel = null;
+      this.columnChunkMetaDataForRowGroups = null;
+    } else {
+      this.model = null;
+      this.vectorizedModel = (VectorizedReader<T>) batchedReaderFunc.apply(typeWithIds);
+      this.columnChunkMetaDataForRowGroups =
+          Stream.generate((Supplier<Map<ColumnPath, ColumnChunkMetaData>>) () -> null)
+              .limit(rowGroups.size()).collect(Collectors.toList());
+      populateColumnChunkMetadataForRowGroups();
+    }
+    this.reuseContainers = reuseContainers;
 
 Review comment:
   Nit: Would be nice to have a newline above this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361555575
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
 ##########
 @@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.ParquetDictionaryRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetIO;
+import org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
+  private final InputFile input;
+  private final Schema expectedSchema;
+  private final ParquetReadOptions options;
+  private final Function<MessageType, VectorizedReader<T>> batchReaderFunc;
+  private final Expression filter;
+  private final boolean reuseContainers;
+  private final boolean caseSensitive;
+  private final int batchSize;
+
+  public VectorizedParquetReader(
+      InputFile input, Schema expectedSchema, ParquetReadOptions options,
+      Function<MessageType, VectorizedReader<T>> readerFunc,
+      Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
+    this.input = input;
+    this.expectedSchema = expectedSchema;
+    this.options = options;
+    this.batchReaderFunc = readerFunc;
+    // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
+    this.filter = filter == Expressions.alwaysTrue() ? null : filter;
+    this.reuseContainers = reuseContainers;
+    this.caseSensitive = caseSensitive;
+    this.batchSize = maxRecordsPerBatch;
+  }
+
+  private static class ReadConf<T> {
+    private final ParquetFileReader reader;
+    private final InputFile file;
+    private final ParquetReadOptions options;
+    private final MessageType projection;
+    private final VectorizedReader<T> model;
+    private final List<BlockMetaData> rowGroups;
+    private final boolean[] shouldSkip;
+    private final long totalValues;
+    private final boolean reuseContainers;
+    private final int batchSize;
+
+    @SuppressWarnings("unchecked")
+    ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+        Function<MessageType, VectorizedReader<T>> readerFunc, boolean reuseContainers,
+        boolean caseSensitive, int bSize) {
+      this.file = file;
+      this.options = options;
+      this.reader = newReader(file, options);
+
+      MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+      boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+      MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+      this.projection = hasIds ?
+          ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+          ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+      this.model = readerFunc.apply(typeWithIds);
+      this.rowGroups = reader.getRowGroups();
+      this.shouldSkip = new boolean[rowGroups.size()];
+
+      ParquetMetricsRowGroupFilter statsFilter = null;
+      ParquetDictionaryRowGroupFilter dictFilter = null;
+      if (filter != null) {
+        statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+        dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+      }
+
+      long computedTotalValues = 0L;
+      for (int i = 0; i < shouldSkip.length; i += 1) {
+        BlockMetaData rowGroup = rowGroups.get(i);
+        boolean shouldRead = filter == null || (
+            statsFilter.shouldRead(typeWithIds, rowGroup) &&
+                dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+        this.shouldSkip[i] = !shouldRead;
+        if (shouldRead) {
+          computedTotalValues += rowGroup.getRowCount();
+        }
+      }
+
+      this.totalValues = computedTotalValues;
+      this.reuseContainers = reuseContainers;
+      this.batchSize = bSize;
+    }
+
+    ReadConf(ReadConf<T> toCopy) {
+      this.reader = null;
+      this.file = toCopy.file;
+      this.options = toCopy.options;
+      this.projection = toCopy.projection;
+      this.model = toCopy.model;
+      this.rowGroups = toCopy.rowGroups;
+      this.shouldSkip = toCopy.shouldSkip;
+      this.totalValues = toCopy.totalValues;
+      this.reuseContainers = toCopy.reuseContainers;
+      this.batchSize = toCopy.batchSize;
+    }
+
+    ParquetFileReader reader() {
+      if (reader != null) {
+        reader.setRequestedSchema(projection);
+        return reader;
+      }
+
+      ParquetFileReader newReader = newReader(file, options);
+      newReader.setRequestedSchema(projection);
+      return newReader;
+    }
+
+    VectorizedReader model() {
+      return model;
+    }
+
+    boolean[] shouldSkip() {
+      return shouldSkip;
+    }
+
+    long totalValues() {
+      return totalValues;
+    }
+
+    boolean reuseContainers() {
+      return reuseContainers;
+    }
+
+    int batchSize() {
+      return batchSize;
+    }
+
+    private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
+      try {
+        return ParquetFileReader.open(ParquetIO.file(file), options);
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
+      }
+    }
+
+    ReadConf<T> copy() {
+      return new ReadConf<>(this);
+    }
+  }
+
+  private ReadConf conf = null;
+
+  private ReadConf init() {
+    if (conf == null) {
+      ReadConf readConf = new ReadConf(
+          input, options, expectedSchema, filter, batchReaderFunc, reuseContainers, caseSensitive, batchSize);
+      this.conf = readConf.copy();
+      return readConf;
+    }
+
+    return conf;
+  }
+
+  @Override
+  public Iterator iterator() {
+    FileIterator iter = new FileIterator(init());
+    addCloseable(iter);
+    return iter;
+  }
+
+  private static class FileIterator<T> implements Iterator<T>, Closeable {
+    private final ParquetFileReader reader;
+    private final boolean[] shouldSkip;
+    private final VectorizedReader<T> model;
+    private final long totalValues;
+    private final int batchSize;
+
+    private int nextRowGroup = 0;
+    private long nextRowGroupStart = 0;
+    private long valuesRead = 0;
+    private T last = null;
+
+    FileIterator(ReadConf conf) {
+      this.reader = conf.reader();
+      this.shouldSkip = conf.shouldSkip();
+      this.model = conf.model();
+      this.totalValues = conf.totalValues();
+      this.model.reuseContainers(conf.reuseContainers());
+      this.batchSize = conf.batchSize();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return valuesRead < totalValues;
+    }
+
+    @Override
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      if (valuesRead >= nextRowGroupStart) {
+        advance();
+      }
+      this.last = model.read();
+      valuesRead += Math.min(nextRowGroupStart - valuesRead, batchSize);
+      return last;
+    }
+
+    private void advance() {
+      while (shouldSkip[nextRowGroup]) {
+        nextRowGroup += 1;
+        reader.skipNextRowGroup();
+      }
+
+      PageReadStore pages;
+      DictionaryPageReadStore dictionaryPageReadStore;
+      try {
+        dictionaryPageReadStore = reader.getNextDictionaryReader();
+        pages = reader.readNextRowGroup();
+      } catch (IOException e) {
+        throw new RuntimeIOException(e);
+      }
+
+      nextRowGroupStart += pages.getRowCount();
+      nextRowGroup += 1;
+      model.setRowGroupInfo(
+          pages,
+          dictionaryPageReadStore,
+          dictionaryPageReadStore == null ? null : buildColumnDictEncodedMap(reader.getRowGroups()));
+    }
+
+    /**
+     * Retuns a map of {@link ColumnPath} -> whether all the pages in the row group for this column are dictionary
+     * encoded
+     */
+    private static Map<ColumnPath, Boolean> buildColumnDictEncodedMap(List<BlockMetaData> blockMetaData) {
+      Map<ColumnPath, Boolean> map = new HashMap<>();
+      for (BlockMetaData b : blockMetaData) {
+        for (ColumnChunkMetaData c : b.getColumns()) {
+          map.put(c.getPath(), !ParquetUtil.hasNonDictionaryPages(c));
+        }
+      }
+      return map;
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361722391
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
 ##########
 @@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.ParquetDictionaryRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetIO;
+import org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
+  private final InputFile input;
+  private final Schema expectedSchema;
+  private final ParquetReadOptions options;
+  private final Function<MessageType, VectorizedReader<T>> batchReaderFunc;
+  private final Expression filter;
+  private final boolean reuseContainers;
+  private final boolean caseSensitive;
+  private final int batchSize;
+
+  public VectorizedParquetReader(
+      InputFile input, Schema expectedSchema, ParquetReadOptions options,
+      Function<MessageType, VectorizedReader<T>> readerFunc,
+      Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
+    this.input = input;
+    this.expectedSchema = expectedSchema;
+    this.options = options;
+    this.batchReaderFunc = readerFunc;
+    // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
+    this.filter = filter == Expressions.alwaysTrue() ? null : filter;
+    this.reuseContainers = reuseContainers;
+    this.caseSensitive = caseSensitive;
+    this.batchSize = maxRecordsPerBatch;
+  }
+
+  private static class ReadConf<T> {
+    private final ParquetFileReader reader;
+    private final InputFile file;
+    private final ParquetReadOptions options;
+    private final MessageType projection;
+    private final VectorizedReader<T> model;
+    private final List<BlockMetaData> rowGroups;
+    private final boolean[] shouldSkip;
+    private final long totalValues;
+    private final boolean reuseContainers;
+    private final int batchSize;
+
+    @SuppressWarnings("unchecked")
+    ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+        Function<MessageType, VectorizedReader<T>> readerFunc, boolean reuseContainers,
+        boolean caseSensitive, int bSize) {
+      this.file = file;
+      this.options = options;
+      this.reader = newReader(file, options);
+
+      MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+      boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+      MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+      this.projection = hasIds ?
+          ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+          ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+      this.model = readerFunc.apply(typeWithIds);
+      this.rowGroups = reader.getRowGroups();
+      this.shouldSkip = new boolean[rowGroups.size()];
+
+      ParquetMetricsRowGroupFilter statsFilter = null;
+      ParquetDictionaryRowGroupFilter dictFilter = null;
+      if (filter != null) {
+        statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+        dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+      }
+
+      long computedTotalValues = 0L;
+      for (int i = 0; i < shouldSkip.length; i += 1) {
+        BlockMetaData rowGroup = rowGroups.get(i);
+        boolean shouldRead = filter == null || (
+            statsFilter.shouldRead(typeWithIds, rowGroup) &&
+                dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+        this.shouldSkip[i] = !shouldRead;
+        if (shouldRead) {
+          computedTotalValues += rowGroup.getRowCount();
+        }
+      }
+
+      this.totalValues = computedTotalValues;
+      this.reuseContainers = reuseContainers;
+      this.batchSize = bSize;
+    }
+
+    ReadConf(ReadConf<T> toCopy) {
+      this.reader = null;
+      this.file = toCopy.file;
+      this.options = toCopy.options;
+      this.projection = toCopy.projection;
+      this.model = toCopy.model;
+      this.rowGroups = toCopy.rowGroups;
+      this.shouldSkip = toCopy.shouldSkip;
+      this.totalValues = toCopy.totalValues;
+      this.reuseContainers = toCopy.reuseContainers;
+      this.batchSize = toCopy.batchSize;
+    }
+
+    ParquetFileReader reader() {
+      if (reader != null) {
+        reader.setRequestedSchema(projection);
+        return reader;
+      }
+
+      ParquetFileReader newReader = newReader(file, options);
+      newReader.setRequestedSchema(projection);
+      return newReader;
+    }
+
+    VectorizedReader model() {
+      return model;
+    }
+
+    boolean[] shouldSkip() {
+      return shouldSkip;
+    }
+
+    long totalValues() {
+      return totalValues;
+    }
+
+    boolean reuseContainers() {
+      return reuseContainers;
+    }
+
+    int batchSize() {
+      return batchSize;
+    }
+
+    private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
+      try {
+        return ParquetFileReader.open(ParquetIO.file(file), options);
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
+      }
+    }
+
+    ReadConf<T> copy() {
+      return new ReadConf<>(this);
+    }
+  }
+
+  private ReadConf conf = null;
+
+  private ReadConf init() {
+    if (conf == null) {
+      ReadConf readConf = new ReadConf(
+          input, options, expectedSchema, filter, batchReaderFunc, reuseContainers, caseSensitive, batchSize);
+      this.conf = readConf.copy();
+      return readConf;
+    }
+
+    return conf;
+  }
+
+  @Override
+  public Iterator iterator() {
+    FileIterator iter = new FileIterator(init());
+    addCloseable(iter);
+    return iter;
+  }
+
+  private static class FileIterator<T> implements Iterator<T>, Closeable {
+    private final ParquetFileReader reader;
+    private final boolean[] shouldSkip;
+    private final VectorizedReader<T> model;
+    private final long totalValues;
+    private final int batchSize;
+
+    private int nextRowGroup = 0;
+    private long nextRowGroupStart = 0;
+    private long valuesRead = 0;
+    private T last = null;
+
+    FileIterator(ReadConf conf) {
+      this.reader = conf.reader();
+      this.shouldSkip = conf.shouldSkip();
+      this.model = conf.model();
+      this.totalValues = conf.totalValues();
+      this.model.reuseContainers(conf.reuseContainers());
+      this.batchSize = conf.batchSize();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return valuesRead < totalValues;
+    }
+
+    @Override
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      if (valuesRead >= nextRowGroupStart) {
+        advance();
+      }
+      this.last = model.read();
+      valuesRead += Math.min(nextRowGroupStart - valuesRead, batchSize);
+      return last;
+    }
+
+    private void advance() {
+      while (shouldSkip[nextRowGroup]) {
+        nextRowGroup += 1;
+        reader.skipNextRowGroup();
+      }
+
+      PageReadStore pages;
+      DictionaryPageReadStore dictionaryPageReadStore;
+      try {
+        dictionaryPageReadStore = reader.getNextDictionaryReader();
+        pages = reader.readNextRowGroup();
+      } catch (IOException e) {
+        throw new RuntimeIOException(e);
+      }
+
+      nextRowGroupStart += pages.getRowCount();
+      nextRowGroup += 1;
+      model.setRowGroupInfo(
+          pages,
+          dictionaryPageReadStore,
+          dictionaryPageReadStore == null ? null : buildColumnDictEncodedMap(reader.getRowGroups()));
 
 Review comment:
   Why does the same map need to be passed in for every row group?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361762375
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
 ##########
 @@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.InputFile;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Configuration for Parquet readers.
+ *
+ * @param <T> type of value to read
+ */
+class ReadConf<T> {
+  private final ParquetFileReader reader;
+  private final InputFile file;
+  private final ParquetReadOptions options;
+  private final MessageType projection;
+  @Nullable
+  private final ParquetValueReader<T> model;
+  @Nullable
+  private final VectorizedReader<T> vectorizedModel;
+  private final List<BlockMetaData> rowGroups;
+  private final boolean[] shouldSkip;
+  private final long totalValues;
+  private final boolean reuseContainers;
+  @Nullable
+  private final Integer batchSize;
+
+  // List of column chunk metadata for each row group
+  private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetaDataForRowGroups;
+
+  @SuppressWarnings("unchecked")
+  ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+           Function<MessageType, ParquetValueReader<?>> readerFunc, Function<MessageType,
+           VectorizedReader<?>> batchedReaderFunc, boolean reuseContainers,
+           boolean caseSensitive, Integer bSize) {
+    this.file = file;
+    this.options = options;
+    this.reader = newReader(file, options);
+    MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+    boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+    MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+    this.projection = hasIds ?
+        ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+        ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+    this.rowGroups = reader.getRowGroups();
+    this.shouldSkip = new boolean[rowGroups.size()];
+
+    ParquetMetricsRowGroupFilter statsFilter = null;
+    ParquetDictionaryRowGroupFilter dictFilter = null;
+    if (filter != null) {
+      statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+      dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+    }
+
+    long computedTotalValues = 0L;
+    for (int i = 0; i < shouldSkip.length; i += 1) {
+      BlockMetaData rowGroup = rowGroups.get(i);
+      boolean shouldRead = filter == null || (
+          statsFilter.shouldRead(typeWithIds, rowGroup) &&
+              dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+      this.shouldSkip[i] = !shouldRead;
+      if (shouldRead) {
+        computedTotalValues += rowGroup.getRowCount();
+      }
+    }
+
+    this.totalValues = computedTotalValues;
+    if (readerFunc != null) {
+      this.model = (ParquetValueReader<T>) readerFunc.apply(typeWithIds);
+      this.vectorizedModel = null;
+      this.columnChunkMetaDataForRowGroups = null;
+    } else {
+      this.model = null;
+      this.vectorizedModel = (VectorizedReader<T>) batchedReaderFunc.apply(typeWithIds);
+      this.columnChunkMetaDataForRowGroups = getColumnChunkMetadataForRowGroups();
+    }
+
+    this.reuseContainers = reuseContainers;
+    this.batchSize = bSize;
+  }
+
+  private ReadConf(ReadConf<T> toCopy) {
+    this.reader = null;
+    this.file = toCopy.file;
+    this.options = toCopy.options;
+    this.projection = toCopy.projection;
+    this.model = toCopy.model;
+    this.rowGroups = toCopy.rowGroups;
+    this.shouldSkip = toCopy.shouldSkip;
+    this.totalValues = toCopy.totalValues;
+    this.reuseContainers = toCopy.reuseContainers;
+    this.batchSize = toCopy.batchSize;
+    this.vectorizedModel = toCopy.vectorizedModel;
+    this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups;
+  }
+
+  ParquetFileReader reader() {
+    if (reader != null) {
+      reader.setRequestedSchema(projection);
+      return reader;
+    }
+
+    ParquetFileReader newReader = newReader(file, options);
+    newReader.setRequestedSchema(projection);
+    return newReader;
+  }
+
+  ParquetValueReader<T> model() {
+    return model;
+  }
+
+  VectorizedReader<T> vectorizedModel() {
+    return vectorizedModel;
+  }
+
+  boolean[] shouldSkip() {
+    return shouldSkip;
+  }
+
+  long totalValues() {
+    return totalValues;
+  }
+
+  boolean reuseContainers() {
+    return reuseContainers;
+  }
+
+  Integer batchSize() {
+    return batchSize;
+  }
+
+  List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadataForRowGroups() {
+    return columnChunkMetaDataForRowGroups;
+  }
+
+  ReadConf<T> copy() {
+    return new ReadConf<>(this);
+  }
+
+  private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
+    try {
+      return ParquetFileReader.open(ParquetIO.file(file), options);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
+    }
+  }
+
+  private List<Map<ColumnPath, ColumnChunkMetaData>> getColumnChunkMetadataForRowGroups() {
+    Set<ColumnPath> projectedColumns = projection.getColumns().stream()
+        .map(columnDescriptor -> ColumnPath.get(columnDescriptor.getPath())).collect(Collectors.toSet());
+    ImmutableList.Builder builder = ImmutableList.<Map<ColumnPath, ColumnChunkMetaData>>builder();
+    for (int i = 0; i < rowGroups.size(); i++) {
+      if (!shouldSkip[i]) {
+        BlockMetaData blockMetaData = rowGroups.get(i);
+        Map<ColumnPath, ColumnChunkMetaData> map = new HashMap<>();
 
 Review comment:
   Can this map be immutable as well?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r362050520
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
 ##########
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.InputFile;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Configuration for Parquet readers.
+ *
+ * @param <T> type of value to read
+ */
+class ReadConf<T> {
+  private final ParquetFileReader reader;
+  private final InputFile file;
+  private final ParquetReadOptions options;
+  private final MessageType projection;
+  @Nullable
+  private final ParquetValueReader<T> model;
+  @Nullable
+  private final VectorizedReader<T> vectorizedModel;
+  private final List<BlockMetaData> rowGroups;
+  private final boolean[] shouldSkip;
+  private final long totalValues;
+  private final boolean reuseContainers;
+  @Nullable
+  private final Integer batchSize;
+
+  // List of column chunk metadata for each row group
+  private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetaDataForRowGroups;
+
+  @SuppressWarnings("unchecked")
+  ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+           Function<MessageType, ParquetValueReader<?>> readerFunc, Function<MessageType,
+           VectorizedReader<?>> batchedReaderFunc, boolean reuseContainers,
+           boolean caseSensitive, Integer bSize) {
+    this.file = file;
+    this.options = options;
+    this.reader = newReader(file, options);
+    MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+    boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+    MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+    this.projection = hasIds ?
+        ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+        ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+    this.rowGroups = reader.getRowGroups();
+    this.shouldSkip = new boolean[rowGroups.size()];
+
+    ParquetMetricsRowGroupFilter statsFilter = null;
+    ParquetDictionaryRowGroupFilter dictFilter = null;
+    if (filter != null) {
+      statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+      dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+    }
+
+    long computedTotalValues = 0L;
+    for (int i = 0; i < shouldSkip.length; i += 1) {
+      BlockMetaData rowGroup = rowGroups.get(i);
+      boolean shouldRead = filter == null || (
+          statsFilter.shouldRead(typeWithIds, rowGroup) &&
+              dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+      this.shouldSkip[i] = !shouldRead;
+      if (shouldRead) {
+        computedTotalValues += rowGroup.getRowCount();
+      }
+    }
+
+    this.totalValues = computedTotalValues;
+    if (readerFunc != null) {
+      this.model = (ParquetValueReader<T>) readerFunc.apply(typeWithIds);
+      this.vectorizedModel = null;
+      this.columnChunkMetaDataForRowGroups = null;
+    } else {
+      this.model = null;
+      this.vectorizedModel = (VectorizedReader<T>) batchedReaderFunc.apply(typeWithIds);
+      this.columnChunkMetaDataForRowGroups = getColumnChunkMetadataForRowGroups();
+    }
+
+    this.reuseContainers = reuseContainers;
+    this.batchSize = bSize;
+  }
+
+  private ReadConf(ReadConf<T> toCopy) {
+    this.reader = null;
+    this.file = toCopy.file;
+    this.options = toCopy.options;
+    this.projection = toCopy.projection;
+    this.model = toCopy.model;
+    this.rowGroups = toCopy.rowGroups;
+    this.shouldSkip = toCopy.shouldSkip;
+    this.totalValues = toCopy.totalValues;
+    this.reuseContainers = toCopy.reuseContainers;
+    this.batchSize = toCopy.batchSize;
+    this.vectorizedModel = toCopy.vectorizedModel;
+    this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups;
+  }
+
+  ParquetFileReader reader() {
+    if (reader != null) {
+      reader.setRequestedSchema(projection);
+      return reader;
+    }
+
+    ParquetFileReader newReader = newReader(file, options);
+    newReader.setRequestedSchema(projection);
+    return newReader;
+  }
+
+  ParquetValueReader<T> model() {
+    return model;
+  }
+
+  VectorizedReader<T> vectorizedModel() {
+    return vectorizedModel;
+  }
+
+  boolean[] shouldSkip() {
+    return shouldSkip;
+  }
+
+  long totalValues() {
+    return totalValues;
+  }
+
+  boolean reuseContainers() {
+    return reuseContainers;
+  }
+
+  Integer batchSize() {
 
 Review comment:
   I just noticed this is an `Integer` instead of `int`. Can we change this to the primitive so that we don't have to worry about null pointer exceptions? Just set it to a default for non-batch cases, or use a precondition in this method to check whether the batch reader function is set.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361555828
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
 ##########
 @@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.ParquetDictionaryRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetIO;
+import org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
+  private final InputFile input;
+  private final Schema expectedSchema;
+  private final ParquetReadOptions options;
+  private final Function<MessageType, VectorizedReader<T>> batchReaderFunc;
+  private final Expression filter;
+  private final boolean reuseContainers;
+  private final boolean caseSensitive;
+  private final int batchSize;
+
+  public VectorizedParquetReader(
+      InputFile input, Schema expectedSchema, ParquetReadOptions options,
+      Function<MessageType, VectorizedReader<T>> readerFunc,
+      Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
+    this.input = input;
+    this.expectedSchema = expectedSchema;
+    this.options = options;
+    this.batchReaderFunc = readerFunc;
+    // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
+    this.filter = filter == Expressions.alwaysTrue() ? null : filter;
+    this.reuseContainers = reuseContainers;
+    this.caseSensitive = caseSensitive;
+    this.batchSize = maxRecordsPerBatch;
+  }
+
+  private static class ReadConf<T> {
+    private final ParquetFileReader reader;
+    private final InputFile file;
+    private final ParquetReadOptions options;
+    private final MessageType projection;
+    private final VectorizedReader<T> model;
+    private final List<BlockMetaData> rowGroups;
+    private final boolean[] shouldSkip;
+    private final long totalValues;
+    private final boolean reuseContainers;
+    private final int batchSize;
+
+    @SuppressWarnings("unchecked")
+    ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+        Function<MessageType, VectorizedReader<T>> readerFunc, boolean reuseContainers,
+        boolean caseSensitive, int bSize) {
+      this.file = file;
+      this.options = options;
+      this.reader = newReader(file, options);
+
+      MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+      boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+      MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+      this.projection = hasIds ?
+          ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+          ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+      this.model = readerFunc.apply(typeWithIds);
+      this.rowGroups = reader.getRowGroups();
+      this.shouldSkip = new boolean[rowGroups.size()];
+
+      ParquetMetricsRowGroupFilter statsFilter = null;
+      ParquetDictionaryRowGroupFilter dictFilter = null;
+      if (filter != null) {
+        statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+        dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+      }
+
+      long computedTotalValues = 0L;
+      for (int i = 0; i < shouldSkip.length; i += 1) {
+        BlockMetaData rowGroup = rowGroups.get(i);
+        boolean shouldRead = filter == null || (
+            statsFilter.shouldRead(typeWithIds, rowGroup) &&
+                dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+        this.shouldSkip[i] = !shouldRead;
+        if (shouldRead) {
+          computedTotalValues += rowGroup.getRowCount();
+        }
+      }
+
+      this.totalValues = computedTotalValues;
+      this.reuseContainers = reuseContainers;
+      this.batchSize = bSize;
+    }
+
+    ReadConf(ReadConf<T> toCopy) {
+      this.reader = null;
+      this.file = toCopy.file;
+      this.options = toCopy.options;
+      this.projection = toCopy.projection;
+      this.model = toCopy.model;
+      this.rowGroups = toCopy.rowGroups;
+      this.shouldSkip = toCopy.shouldSkip;
+      this.totalValues = toCopy.totalValues;
+      this.reuseContainers = toCopy.reuseContainers;
+      this.batchSize = toCopy.batchSize;
+    }
+
+    ParquetFileReader reader() {
+      if (reader != null) {
+        reader.setRequestedSchema(projection);
+        return reader;
+      }
+
+      ParquetFileReader newReader = newReader(file, options);
+      newReader.setRequestedSchema(projection);
+      return newReader;
+    }
+
+    VectorizedReader model() {
+      return model;
+    }
+
+    boolean[] shouldSkip() {
+      return shouldSkip;
+    }
+
+    long totalValues() {
+      return totalValues;
+    }
+
+    boolean reuseContainers() {
+      return reuseContainers;
+    }
+
+    int batchSize() {
+      return batchSize;
+    }
+
+    private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
+      try {
+        return ParquetFileReader.open(ParquetIO.file(file), options);
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
+      }
+    }
+
+    ReadConf<T> copy() {
+      return new ReadConf<>(this);
+    }
+  }
+
+  private ReadConf conf = null;
+
+  private ReadConf init() {
+    if (conf == null) {
+      ReadConf readConf = new ReadConf(
+          input, options, expectedSchema, filter, batchReaderFunc, reuseContainers, caseSensitive, batchSize);
+      this.conf = readConf.copy();
+      return readConf;
+    }
+
+    return conf;
+  }
+
+  @Override
+  public Iterator iterator() {
+    FileIterator iter = new FileIterator(init());
+    addCloseable(iter);
+    return iter;
+  }
+
+  private static class FileIterator<T> implements Iterator<T>, Closeable {
+    private final ParquetFileReader reader;
+    private final boolean[] shouldSkip;
+    private final VectorizedReader<T> model;
+    private final long totalValues;
+    private final int batchSize;
+
+    private int nextRowGroup = 0;
+    private long nextRowGroupStart = 0;
+    private long valuesRead = 0;
+    private T last = null;
+
+    FileIterator(ReadConf conf) {
+      this.reader = conf.reader();
+      this.shouldSkip = conf.shouldSkip();
+      this.model = conf.model();
+      this.totalValues = conf.totalValues();
+      this.model.reuseContainers(conf.reuseContainers());
+      this.batchSize = conf.batchSize();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return valuesRead < totalValues;
+    }
+
+    @Override
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      if (valuesRead >= nextRowGroupStart) {
+        advance();
+      }
+      this.last = model.read();
+      valuesRead += Math.min(nextRowGroupStart - valuesRead, batchSize);
+      return last;
+    }
+
+    private void advance() {
+      while (shouldSkip[nextRowGroup]) {
+        nextRowGroup += 1;
+        reader.skipNextRowGroup();
+      }
+
+      PageReadStore pages;
+      DictionaryPageReadStore dictionaryPageReadStore;
+      try {
+        dictionaryPageReadStore = reader.getNextDictionaryReader();
+        pages = reader.readNextRowGroup();
+      } catch (IOException e) {
+        throw new RuntimeIOException(e);
+      }
+
+      nextRowGroupStart += pages.getRowCount();
+      nextRowGroup += 1;
+      model.setRowGroupInfo(
+          pages,
+          dictionaryPageReadStore,
+          dictionaryPageReadStore == null ? null : buildColumnDictEncodedMap(reader.getRowGroups()));
 
 Review comment:
   I moved the creation of the map in the ReadConf when the ParquetFileReader is initialized for the file. However, I think I still need to have the setRowGroupInfo method to pass in the ColumnPath->Boolean map. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361017406
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
 ##########
 @@ -362,6 +367,16 @@ public ReadBuilder createReaderFunc(Function<MessageType, ParquetValueReader<?>>
       return this;
     }
 
+    public ReadBuilder createBatchedReaderFunc(Function<MessageType, VectorizedReader<?>> func) {
+      this.batchedReaderFunc = func;
+      return this;
+    }
+
+    public ReadBuilder enableBatchedRead() {
 
 Review comment:
   Why not enable batch read when `createBatchReaderFunc` is called? If that's null, then use the other read path. If it is defined, then use the vectorized path.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361724241
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
 ##########
 @@ -362,6 +364,11 @@ public ReadBuilder createReaderFunc(Function<MessageType, ParquetValueReader<?>>
       return this;
     }
 
+    public ReadBuilder createBatchedReaderFunc(Function<MessageType, VectorizedReader<?>> func) {
 
 Review comment:
   Can you add preconditions to validate that the other reader func is not set, and one in the other method for when this one is already set? That way we throw an `IllegalArgumentException` when both are called and we don't have to worry about what to do when the user configures two object models.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361723761
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
 ##########
 @@ -59,122 +57,15 @@ public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions
     this.caseSensitive = caseSensitive;
   }
 
-  private static class ReadConf<T> {
-    private final ParquetFileReader reader;
-    private final InputFile file;
-    private final ParquetReadOptions options;
-    private final MessageType projection;
-    private final ParquetValueReader<T> model;
-    private final List<BlockMetaData> rowGroups;
-    private final boolean[] shouldSkip;
-    private final long totalValues;
-    private final boolean reuseContainers;
-
-    @SuppressWarnings("unchecked")
-    ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
-             Function<MessageType, ParquetValueReader<?>> readerFunc, boolean reuseContainers,
-             boolean caseSensitive) {
-      this.file = file;
-      this.options = options;
-      this.reader = newReader(file, options);
-
-      MessageType fileSchema = reader.getFileMetaData().getSchema();
-
-      boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
-      MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
-
-      this.projection = hasIds ?
-          ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
-          ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
-      this.model = (ParquetValueReader<T>) readerFunc.apply(typeWithIds);
-      this.rowGroups = reader.getRowGroups();
-      this.shouldSkip = new boolean[rowGroups.size()];
-
-      ParquetMetricsRowGroupFilter statsFilter = null;
-      ParquetDictionaryRowGroupFilter dictFilter = null;
-      if (filter != null) {
-        statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
-        dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
-      }
-
-      long computedTotalValues = 0L;
-      for (int i = 0; i < shouldSkip.length; i += 1) {
-        BlockMetaData rowGroup = rowGroups.get(i);
-        boolean shouldRead = filter == null || (
-            statsFilter.shouldRead(typeWithIds, rowGroup) &&
-            dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
-        this.shouldSkip[i] = !shouldRead;
-        if (shouldRead) {
-          computedTotalValues += rowGroup.getRowCount();
-        }
-      }
-
-      this.totalValues = computedTotalValues;
-      this.reuseContainers = reuseContainers;
-    }
-
-    ReadConf(ReadConf<T> toCopy) {
-      this.reader = null;
-      this.file = toCopy.file;
-      this.options = toCopy.options;
-      this.projection = toCopy.projection;
-      this.model = toCopy.model;
-      this.rowGroups = toCopy.rowGroups;
-      this.shouldSkip = toCopy.shouldSkip;
-      this.totalValues = toCopy.totalValues;
-      this.reuseContainers = toCopy.reuseContainers;
-    }
-
-    ParquetFileReader reader() {
-      if (reader != null) {
-        reader.setRequestedSchema(projection);
-        return reader;
-      }
-
-      ParquetFileReader newReader = newReader(file, options);
-      newReader.setRequestedSchema(projection);
-      return newReader;
-    }
-
-    ParquetValueReader<T> model() {
-      return model;
-    }
-
-    boolean[] shouldSkip() {
-      return shouldSkip;
-    }
-
-    long totalValues() {
-      return totalValues;
-    }
-
-    boolean reuseContainers() {
-      return reuseContainers;
-    }
-
-    ReadConf<T> copy() {
-      return new ReadConf<>(this);
-    }
-
-    private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
-      try {
-        return ParquetFileReader.open(ParquetIO.file(file), options);
-      } catch (IOException e) {
-        throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
-      }
-    }
-  }
-
   private ReadConf<T> conf = null;
 
   private ReadConf<T> init() {
     if (conf == null) {
-      ReadConf<T> readConf = new ReadConf<>(
-          input, options, expectedSchema, filter, readerFunc, reuseContainers, caseSensitive);
+      ReadConf<T> readConf = new ReadConf<T>(
 
 Review comment:
   Why was it necessary to add the type variable here? Java should be able to infer it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361020283
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet.vectorized;
+
+import java.util.Map;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * Interface for vectorized Iceberg readers.
+ */
+public interface VectorizedReader<T> {
+  T read();
+
+  void setRowGroupInfo(PageReadStore pages, DictionaryPageReadStore dictionaryPageReadStore,
+                       Map<ColumnPath, Boolean> columnPathBooleanMap);
+
+  void reuseContainers(boolean reuse);
 
 Review comment:
   If the container isn't passed into this interface, then the reader implementation needs to be responsible for keeping a reference to the last result (T) that was returned. That seems a little strange to me because it is a separate concern. I'm okay with this, but we may want to refactor later.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361762218
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
 ##########
 @@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.InputFile;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Configuration for Parquet readers.
+ *
+ * @param <T> type of value to read
+ */
+class ReadConf<T> {
+  private final ParquetFileReader reader;
+  private final InputFile file;
+  private final ParquetReadOptions options;
+  private final MessageType projection;
+  @Nullable
+  private final ParquetValueReader<T> model;
+  @Nullable
+  private final VectorizedReader<T> vectorizedModel;
+  private final List<BlockMetaData> rowGroups;
+  private final boolean[] shouldSkip;
+  private final long totalValues;
+  private final boolean reuseContainers;
+  @Nullable
+  private final Integer batchSize;
+
+  // List of column chunk metadata for each row group
+  private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetaDataForRowGroups;
+
+  @SuppressWarnings("unchecked")
+  ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
+           Function<MessageType, ParquetValueReader<?>> readerFunc, Function<MessageType,
+           VectorizedReader<?>> batchedReaderFunc, boolean reuseContainers,
+           boolean caseSensitive, Integer bSize) {
+    this.file = file;
+    this.options = options;
+    this.reader = newReader(file, options);
+    MessageType fileSchema = reader.getFileMetaData().getSchema();
+
+    boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
+    MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
+
+    this.projection = hasIds ?
+        ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
+        ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+    this.rowGroups = reader.getRowGroups();
+    this.shouldSkip = new boolean[rowGroups.size()];
+
+    ParquetMetricsRowGroupFilter statsFilter = null;
+    ParquetDictionaryRowGroupFilter dictFilter = null;
+    if (filter != null) {
+      statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+      dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
+    }
+
+    long computedTotalValues = 0L;
+    for (int i = 0; i < shouldSkip.length; i += 1) {
+      BlockMetaData rowGroup = rowGroups.get(i);
+      boolean shouldRead = filter == null || (
+          statsFilter.shouldRead(typeWithIds, rowGroup) &&
+              dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+      this.shouldSkip[i] = !shouldRead;
+      if (shouldRead) {
+        computedTotalValues += rowGroup.getRowCount();
+      }
+    }
+
+    this.totalValues = computedTotalValues;
+    if (readerFunc != null) {
+      this.model = (ParquetValueReader<T>) readerFunc.apply(typeWithIds);
+      this.vectorizedModel = null;
+      this.columnChunkMetaDataForRowGroups = null;
+    } else {
+      this.model = null;
+      this.vectorizedModel = (VectorizedReader<T>) batchedReaderFunc.apply(typeWithIds);
+      this.columnChunkMetaDataForRowGroups = getColumnChunkMetadataForRowGroups();
+    }
+
+    this.reuseContainers = reuseContainers;
+    this.batchSize = bSize;
+  }
+
+  private ReadConf(ReadConf<T> toCopy) {
+    this.reader = null;
+    this.file = toCopy.file;
+    this.options = toCopy.options;
+    this.projection = toCopy.projection;
+    this.model = toCopy.model;
+    this.rowGroups = toCopy.rowGroups;
+    this.shouldSkip = toCopy.shouldSkip;
+    this.totalValues = toCopy.totalValues;
+    this.reuseContainers = toCopy.reuseContainers;
+    this.batchSize = toCopy.batchSize;
+    this.vectorizedModel = toCopy.vectorizedModel;
+    this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups;
+  }
+
+  ParquetFileReader reader() {
+    if (reader != null) {
+      reader.setRequestedSchema(projection);
+      return reader;
+    }
+
+    ParquetFileReader newReader = newReader(file, options);
+    newReader.setRequestedSchema(projection);
+    return newReader;
+  }
+
+  ParquetValueReader<T> model() {
+    return model;
+  }
+
+  VectorizedReader<T> vectorizedModel() {
+    return vectorizedModel;
+  }
+
+  boolean[] shouldSkip() {
+    return shouldSkip;
+  }
+
+  long totalValues() {
+    return totalValues;
+  }
+
+  boolean reuseContainers() {
+    return reuseContainers;
+  }
+
+  Integer batchSize() {
+    return batchSize;
+  }
+
+  List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadataForRowGroups() {
+    return columnChunkMetaDataForRowGroups;
+  }
+
+  ReadConf<T> copy() {
+    return new ReadConf<>(this);
+  }
+
+  private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
+    try {
+      return ParquetFileReader.open(ParquetIO.file(file), options);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
+    }
+  }
+
+  private List<Map<ColumnPath, ColumnChunkMetaData>> getColumnChunkMetadataForRowGroups() {
+    Set<ColumnPath> projectedColumns = projection.getColumns().stream()
+        .map(columnDescriptor -> ColumnPath.get(columnDescriptor.getPath())).collect(Collectors.toSet());
+    ImmutableList.Builder builder = ImmutableList.<Map<ColumnPath, ColumnChunkMetaData>>builder();
 
 Review comment:
   Shouldn't the `Builder` have type parameters instead of the `builder()` call?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r362050170
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java
 ##########
 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.Map;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * Interface for vectorized Iceberg readers.
+ */
+public interface VectorizedReader<T> {
+
+  /**
+   * Reads a batch of type @param &lt;T&gt; and of size numRows
+   * @param numRows number of rows to read
+   * @return batch of records of type @param &lt;T&gt;
+   */
+  T read(int numRows);
+
+  /**
+   *
 
 Review comment:
   This is missing a description.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on issue #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on issue #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#issuecomment-569747226
 
 
   @samarthjain, thanks for fixing this! The remaining changes are minor so I merged this and we can fix them in a follow-up.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #710: Parquet changes for vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #710: Parquet changes for vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/710#discussion_r361017276
 
 

 ##########
 File path: parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
 ##########
 @@ -456,6 +483,8 @@ public ReadBuilder reuseContainers() {
 
       return new ParquetIterable<>(builder);
     }
+
+
 
 Review comment:
   Nit: no need to change these lines.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org