You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@orc.apache.org by GitBox <gi...@apache.org> on 2021/02/18 14:04:12 UTC

[GitHub] [orc] pgaref commented on a change in pull request #635: ORC-742: LazyIO for non-filter columns

pgaref commented on a change in pull request #635:
URL: https://github.com/apache/orc/pull/635#discussion_r578392205



##########
File path: java/core/src/java/org/apache/orc/filter/OrcFilterContext.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.filter.MutableFilterContext;
+import org.apache.orc.TypeDescription;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * This defines the input for any filter operation.
+ * <p>
+ * It offers a convenience method for finding the column vector from a given name, that the filters
+ * can invoke to get access to the column vector.
+ */
+public class OrcFilterContext implements MutableFilterContext {
+
+  VectorizedRowBatch batch = null;
+  // Cache of field to ColumnVector, this is reset everytime the batch reference changes
+  private final Map<String, ColumnVector> vectors;
+  private final TypeDescription readSchema;
+
+  public OrcFilterContext(TypeDescription readSchema) {
+    this.readSchema = readSchema;
+    this.vectors = new HashMap<>();
+  }
+
+  public OrcFilterContext setBatch(@NotNull VectorizedRowBatch batch) {
+    if (batch != this.batch) {
+      this.batch = batch;
+      vectors.clear();
+    }
+    return this;
+  }
+
+  @Override
+  public void setFilterContext(boolean selectedInUse, int[] selected, int selectedSize) {
+    batch.setFilterContext(selectedInUse, selected, selectedSize);
+  }
+
+  @Override
+  public boolean validateSelected() {
+    return batch.validateSelected();
+  }
+
+  @Override
+  public int[] updateSelected(int i) {
+    return batch.updateSelected(i);
+  }
+
+  @Override
+  public void setSelectedInUse(boolean b) {
+    batch.setSelectedInUse(b);
+  }
+
+  @Override
+  public void setSelected(int[] ints) {
+    batch.setSelected(ints);
+  }
+
+  @Override
+  public void setSelectedSize(int i) {
+    batch.setSelectedSize(i);
+  }
+
+  @Override
+  public void reset() {
+    batch.reset();
+  }
+
+  @Override
+  public boolean isSelectedInUse() {
+    return batch.isSelectedInUse();
+  }
+
+  @Override
+  public int[] getSelected() {
+    return batch.getSelected();
+  }
+
+  @Override
+  public int getSelectedSize() {
+    return batch.getSelectedSize();
+  }
+
+  public ColumnVector[] getCols() {
+    return batch.cols;
+  }
+
+  /**
+   * Retrieves the column vector that matches the specified name. Allows support for nested struct
+   * references e.g. order.date where data is a field in a struct called order.
+   *
+   * @param name The column name whose vector should be retrieved
+   * @return The column vector
+   * @throws IllegalArgumentException if the field is not found or if the nested field is not part
+   *                                  of a struct
+   */
+  public ColumnVector findColumnVector(String name) {
+    if (!vectors.containsKey(name)) {
+      vectors.put(name, findVector(name));
+    }
+
+    return vectors.get(name);
+  }
+
+  private ColumnVector findVector(String name) {
+    String[] refs = name.split(SPLIT_ON_PERIOD);

Review comment:
       We already have Utility methods that can be used here, for example: ParserUtils.splitName
   Actually, RecordReaderImpl.findColumns does everything you need under the hood -- you just need to do the extra check for the Struct Type

##########
File path: java/core/src/java/org/apache/orc/filter/OrcFilterContext.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.filter.MutableFilterContext;
+import org.apache.orc.TypeDescription;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * This defines the input for any filter operation.
+ * <p>
+ * It offers a convenience method for finding the column vector from a given name, that the filters
+ * can invoke to get access to the column vector.
+ */
+public class OrcFilterContext implements MutableFilterContext {

Review comment:
       In fact this is an extension of a VectorizedRowBatch with schema -- we cant conveniently extend VectorizedRowBatch but we should mention the above in the doc

##########
File path: java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
##########
@@ -245,7 +263,8 @@ protected static IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind
       switch (kind) {
         case DIRECT_V2:
         case DICTIONARY_V2:
-          return new RunLengthIntegerReaderV2(in, signed, context == null ? false : context.isSkipCorrupt());
+          return new RunLengthIntegerReaderV2(in, signed,

Review comment:
       This change is also unrelated -- I would suggest having this part of the Reader changes. Will add more details on my review comment

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -1221,52 +1244,112 @@ private boolean advanceToNextRow(
   @Override
   public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
     try {
-      if (rowInStripe >= rowCountInStripe) {
-        currentStripe += 1;
-        if (currentStripe >= stripes.size()) {
-          batch.size = 0;
-          return false;
+      int batchSize;
+
+      // do...while is required to handle the case where the filter eliminates all rows in the
+      // batch
+      do {
+        if (rowInStripe >= rowCountInStripe) {
+          currentStripe += 1;
+          if (currentStripe >= stripes.size()) {
+            batch.size = 0;
+            return false;
+          }
+          // Read stripe in Memory
+          readStripe();
+          followRowInStripe = rowInStripe;
         }
-        // Read stripe in Memory
-        readStripe();
-      }
 
-      int batchSize = computeBatchSize(batch.getMaxSize());
-      rowInStripe += batchSize;
-      reader.setVectorColumnCount(batch.getDataColumnCount());
-      reader.nextBatch(batch, batchSize);
-      advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
-      // batch.size can be modified by filter so only batchSize can tell if we actually read rows
+        batchSize = computeBatchSize(batch.getMaxSize());
+        reader.setVectorColumnCount(batch.getDataColumnCount());
+        reader.nextBatch(batch, batchSize, readLevel);
+        if (readLevel == ReadLevel.LEAD && batch.size > 0) {
+          prepareFollowingStreams(rowInStripe, followRowInStripe);

Review comment:
       comment here please

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -83,17 +85,21 @@
   private final boolean[] fileIncluded;
   private final long rowIndexStride;
   private long rowInStripe = 0;
+  private long followRowInStripe = 0;
   private int currentStripe = -1;
   private long rowBaseInStripe = 0;
   private long rowCountInStripe = 0;
   private final BatchReader reader;
   private final OrcIndex indexes;
+  private final boolean[] rowIndexCols;

Review comment:
       This is still not 100% clear -- maybe rowIndexColsToRead?
   As everything evolves around ReadLevel lets make use of it

##########
File path: java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java
##########
@@ -15,62 +15,80 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.orc.impl.reader.tree;
 
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.filter.OrcFilterContext;
 import org.apache.orc.impl.TreeReaderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Set;
 
 public class StructBatchReader extends BatchReader {
+  private static final Logger LOG = LoggerFactory.getLogger(StructBatchReader.class);
   // The reader context including row-filtering details
   private final TreeReaderFactory.Context context;
+  private final TreeReaderFactory.StructTreeReader structReader;
+  private final OrcFilterContext fc;
 
-  public StructBatchReader(TreeReaderFactory.StructTreeReader rowReader, TreeReaderFactory.Context context) {
+  public StructBatchReader(TypeReader rowReader, TreeReaderFactory.Context context) {
     super(rowReader);
     this.context = context;
+    this.fc = new OrcFilterContext(context.getSchemaEvolution().getReaderSchema());
+    if (rowReader instanceof TreeReaderFactory.StructTreeReader) {
+      structReader = (TreeReaderFactory.StructTreeReader) rowReader;
+    } else {
+      structReader = (TreeReaderFactory.StructTreeReader) ((LevelTypeReader)rowReader).getReader();
+    }
   }
 
-  private void readBatchColumn(VectorizedRowBatch batch, TypeReader[] children, int batchSize, int index)
-      throws IOException {
+  private void readBatchColumn(VectorizedRowBatch batch,
+                               TypeReader[] children,
+                               int batchSize,
+                               int index,
+                               ReadLevel readLevel)
+    throws IOException {
     ColumnVector colVector = batch.cols[index];
     if (colVector != null) {
       colVector.reset();
       colVector.ensureSize(batchSize, false);
-      children[index].nextVector(colVector, null, batchSize, batch);
+      children[index].nextVector(colVector, null, batchSize, batch, readLevel);
     }
   }
 
   @Override
-  public void nextBatch(VectorizedRowBatch batch, int batchSize) throws IOException {
-    TypeReader[] children = ((TreeReaderFactory.StructTreeReader) rootType).fields;
-    // Early expand fields --> apply filter --> expand remaining fields
-    Set<Integer> earlyExpandCols = context.getColumnFilterIds();
+  public void nextBatch(VectorizedRowBatch batch, int batchSize, ReadLevel readLevel)
+    throws IOException {
+    nextBatchLevel(batch, batchSize, readLevel);
 
-    // Clear selected and early expand columns used in Filter
-    batch.selectedInUse = false;
-    for (int i = 0; i < children.length && !earlyExpandCols.isEmpty() &&
-        (vectorColumnCount == -1 || i < vectorColumnCount); ++i) {
-      if (earlyExpandCols.contains(children[i].getColumnId())) {
-        readBatchColumn(batch, children, batchSize, i);
+    if (readLevel == ReadLevel.LEAD) {
+      // Apply filter callback to reduce number of # rows selected for decoding in the next
+      // TreeReaders
+      if (this.context.getColumnFilterCallback() != null) {
+        this.context.getColumnFilterCallback().accept(fc.setBatch(batch));
       }
     }
-    // Since we are going to filter rows based on some column values set batch.size earlier here
-    batch.size = batchSize;
+  }
+
+  private void nextBatchLevel(VectorizedRowBatch batch, int batchSize, ReadLevel readLevel) throws IOException {
+    TypeReader[] children = structReader.fields;
 
-    // Apply filter callback to reduce number of # rows selected for decoding in the next TreeReaders
-    if (!earlyExpandCols.isEmpty() && this.context.getColumnFilterCallback() != null) {
-      this.context.getColumnFilterCallback().accept(batch);
+    if (readLevel != ReadLevel.FOLLOW) {
+      // In case of FOLLOW we leave the selectedInUse untouched.
+      batch.selectedInUse = false;

Review comment:
       Is this the right place to reset batch.selectedInUse? I would still keep it as a single method

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -234,11 +246,17 @@ protected RecordReaderImpl(ReaderImpl fileReader,
         // If the column is not present in the file then this can be ignored from read.
         if (expandColId != -1) {
           filterColIds.add(expandColId);
+          rowIndexCols[expandColId] = true;
         }
       }
       LOG.info("Filter Columns: " + filterColIds);
+      this.readLevel = ReadLevel.LEAD;
+    } else {
+      this.readLevel = ReadLevel.ALL;

Review comment:
       Maybe its cleaner to make ReadLevel.ALL the default init value (non-final) and then change it here to READ only when filters are enabled?

##########
File path: java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
##########
@@ -68,14 +74,15 @@
   private String writerTimezone;
   private long currentStripeId;
   private long originalStripeId;
-  private Map<StreamName, StreamInformation> streams = new HashMap<>();
+  private final Map<StreamName, StreamInformation> streams = new HashMap<>();

Review comment:
       unrelated

##########
File path: java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
##########
@@ -363,11 +396,26 @@ private void addChunk(BufferChunkList list, StreamInformation stream,
         stream.firstChunk = chunk;
       }
       list.add(chunk);
+      stream.lastChunk = chunk;
       offset += thisLen;
       length -= thisLen;
     }
   }
 
+  private BufferChunkList addFollowChunk(long offset, long length) {

Review comment:
       Method never used? 

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -83,17 +85,24 @@
   private final boolean[] fileIncluded;
   private final long rowIndexStride;
   private long rowInStripe = 0;
+  private long followRowInStripe = 0;
   private int currentStripe = -1;
   private long rowBaseInStripe = 0;
   private long rowCountInStripe = 0;
   private final BatchReader reader;
   private final OrcIndex indexes;
+  // identifies the columns requiring row indexes
+  private final boolean[] rowIndexCols;
   private final SargApplier sargApp;
   // an array about which row groups aren't skipped
   private boolean[] includedRowGroups = null;
   private final DataReader dataReader;
   private final int maxDiskRangeChunkLimit;
   private final StripePlanner planner;
+  // identifies the type of read: FULL (no filters), LEAD (filters are present)
+  private final ReadLevel readLevel;

Review comment:
       Seems there is a FOLLOW type as well..

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -748,11 +763,11 @@ private static TruthValue checkInBloomFilter(BloomFilter bf,
     TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO;
 
     if (predObj instanceof Long) {
-      if (bf.testLong(((Long) predObj).longValue())) {
+      if (bf.testLong((Long) predObj)) {

Review comment:
       All pred changes below seem unrelated?

##########
File path: java/core/src/java/org/apache/orc/impl/BufferChunkList.java
##########
@@ -22,6 +22,14 @@
  * Builds a list of buffer chunks
  */
 public class BufferChunkList {
+  public BufferChunk getHead() {

Review comment:
       Are these changes needed? addFollowChunk method that used Head is never called

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -257,8 +275,8 @@ protected RecordReaderImpl(ReaderImpl fileReader,
         new OrcProto.BloomFilterIndex[columns]);
 
     planner = new StripePlanner(evolution.getFileSchema(), encryption,
-        dataReader, writerVersion, ignoreNonUtf8BloomFilter,
-        maxDiskRangeChunkLimit);
+                                dataReader, writerVersion, ignoreNonUtf8BloomFilter,

Review comment:
       indentation seems a bit off?

##########
File path: java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
##########
@@ -2833,8 +2904,7 @@ public static TypeReader createTreeReader(TypeDescription readerType,
       case DATE:
         return new DateTreeReader(fileType.getId(), context);
       case DECIMAL:
-        if (version == OrcFile.Version.UNSTABLE_PRE_2_0 &&
-            fileType.getPrecision() <= TypeDescription.MAX_DECIMAL64_PRECISION){
+        if (isDecimalAsLong(version, fileType.getPrecision())){

Review comment:
       Unrelated change..separate PR?

##########
File path: java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
##########
@@ -402,10 +450,18 @@ private BufferChunkList planIndexReading(boolean[] bloomFilterColumns) {
    * data.
    * @return a list of merged disk ranges to read
    */
-  private BufferChunkList planDataReading() {
+  private BufferChunkList planDataReading(ReadLevel readLevel) {
     BufferChunkList result = new BufferChunkList();
     for(StreamInformation stream: dataStreams) {
-      addChunk(result, stream, stream.offset, stream.length);
+      if (filterColIds.isEmpty()

Review comment:
       indentation? 

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -1170,6 +1189,10 @@ private void advanceStripe() throws IOException {
     }
   }
 
+  private int computeRGIdx(long rowIdx) {

Review comment:
       Lets add doc here

##########
File path: java/core/src/java/org/apache/orc/impl/reader/tree/ReadLevel.java
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.reader.tree;
+
+public enum ReadLevel {

Review comment:
       Shall we make this an inner class of LevelTypeReader?

##########
File path: java/core/src/java/org/apache/orc/impl/reader/tree/LevelStructBatchReader.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.reader.tree;
+
+import org.apache.orc.impl.TreeReaderFactory;
+
+public class LevelStructBatchReader extends StructBatchReader {

Review comment:
       Shall we make this an inner class of StructBatchReader? 

##########
File path: java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.filter.OrcFilterContext;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Consumer;
+
+public class TestRowFilteringIOSkip {
+  private final static Logger LOG = LoggerFactory.getLogger(TestRowFilteringIOSkip.class);
+  private static final Path workDir = new Path(System.getProperty("test.tmp.dir",
+                                                                  "target" + File.separator + "test"

Review comment:
       indentation seems off in here as well

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
##########
@@ -545,7 +545,7 @@ public int hashCode() {
 
     private long currentGeneration = 0;
 
-    private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {

Review comment:
       I believe its the latter

##########
File path: java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
##########
@@ -2758,34 +2818,44 @@ public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
     }
 
     @Override
-    public void startStripe(StripePlanner planner) throws IOException {
-      super.startStripe(planner);
+    public void startStripe(StripePlanner planner, ReadLevel readLevel) throws IOException {
+      super.startStripe(planner, readLevel);
       lengths = createIntegerReader(planner.getEncoding(columnId).getKind(),
           planner.getStream(new StreamName(columnId,
               OrcProto.Stream.Kind.LENGTH)), false, context);
       if (keyReader != null) {
-        keyReader.startStripe(planner);
+        keyReader.startStripe(planner, readLevel);
       }
       if (valueReader != null) {
-        valueReader.startStripe(planner);
+        valueReader.startStripe(planner, readLevel);
       }
     }
 
     @Override
-    public void skipRows(long items) throws IOException {
+    public void skipRows(long items, ReadLevel readLevel) throws IOException {
       items = countNonNulls(items);
       long childSkip = 0;
       for (long i = 0; i < items; ++i) {
         childSkip += lengths.next();
       }
-      keyReader.skipRows(childSkip);
-      valueReader.skipRows(childSkip);
+      keyReader.skipRows(childSkip, readLevel);
+      valueReader.skipRows(childSkip, readLevel);
     }
   }
 
   public static TypeReader createTreeReader(TypeDescription readerType,
-                                            Context context
-                                            ) throws IOException {
+                                            Context context) throws IOException {
+    TypeReader reader = createTreeReaderInternal(readerType, context);
+    if (context.getColumnFilterCallback() == null
+        || reader instanceof NullTreeReader) {

Review comment:
       indentation?

##########
File path: java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
##########
@@ -663,41 +691,46 @@ public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
     }
 
     @Override
-    public void startStripe(StripePlanner planner) throws IOException {
-      super.startStripe(planner);
+    public void startStripe(StripePlanner planner, ReadLevel readLevel) throws IOException {
+
+      super.startStripe(planner, readLevel);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
       reader = createIntegerReader(planner.getEncoding(columnId).getKind(),
           planner.getStream(name), true, context);
     }
 
     @Override
-    public void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
+    public void seek(PositionProvider[] index, ReadLevel readLevel) throws IOException {
+      seek(index[columnId], readLevel);
     }
 
     @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
+    public void seek(PositionProvider index, ReadLevel readLevel) throws IOException {
+
+      super.seek(index, readLevel);
       reader.seek(index);
     }
 
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
                            final int batchSize,
-                           FilterContext filterContext) throws IOException {
+                           FilterContext filterContext,
+                           ReadLevel readLevel) throws IOException {
+

Review comment:
       Totally agree -- there are empty line additions allover the PR

##########
File path: site/develop/design/lazy_filter.md
##########
@@ -0,0 +1,352 @@
+* [Lazy Filter](#LazyFilter)
+  * [Background](#Background)
+  * [Design](#Design)
+    * [SArg to Filter](#SArgtoFilter)
+    * [Read](#Read)
+  * [Configuration](#Configuration)
+  * [Tests](#Tests)
+  * [Appendix](#Appendix)
+    * [Benchmarks](#Benchmarks)
+      * [Row vs Vector](#RowvsVector)
+      * [Filter](#Filter)
+
+# Lazy Filter <a id="LazyFilter"></a>
+
+## Background <a id="Background"></a>
+
+This feature request started as a result of a large search that is performed with the following characteristics:
+
+* The search fields are not part of partition, bucket or sort fields.
+* The table is a very large table.
+* The predicates result in very few rows compared to the scan size.
+* The search columns are a significant subset of selection columns in the query.
+
+Initial analysis showed that we could have a significant benefit by lazily reading the non-search columns only when we
+have a match. We explore the design and some benchmarks in subsequent sections.
+
+## Design <a id="Design"></a>
+
+This builds further on [ORC-577][ORC-577] which currently only restricts deserialization for some selected data types
+but does not improve on IO.
+
+On a high level the design includes the following components:
+
+```text
+┌──────────────┐          ┌────────────────────────┐
+│              │          │          Read          │
+│              │          │                        │
+│              │          │     ┌────────────┐     │
+│SArg to Filter│─────────▶│     │Read Filter │     │
+│              │          │     │  Columns   │     │
+│              │          │     └────────────┘     │
+│              │          │            │           │
+└──────────────┘          │            ▼           │
+                          │     ┌────────────┐     │
+                          │     │Apply Filter│     │
+                          │     └────────────┘     │
+                          │            │           │
+                          │            ▼           │
+                          │     ┌────────────┐     │
+                          │     │Read Select │     │
+                          │     │  Columns   │     │
+                          │     └────────────┘     │
+                          │                        │
+                          │                        │
+                          └────────────────────────┘
+```
+
+* **SArg to Filter**: Converts Search Arguments passed down into filters for efficient application during scans.
+* **Read**: Performs the lazy read using the filters.
+  * **Read Filter Columns**: Read the filter columns from the file.
+  * **Apply Filter**: Apply the filter on the read filter columns.
+  * **Read Select Columns**: If filter selects at least a row then read the remaining columns.
+
+### SArg to Filter <a id="SArgtoFilter"></a>
+
+SArg to Filter converts the passed SArg into a filter. This enables automatic compatibility with both Spark and Hive as
+they already push down Search Arguments down to ORC.
+
+The SArg is automatically converted into a [Vector Filter][vfilter]. Which is applied during the read process. Two
+filter types were evaluated:
+
+* [Row Filter][rfilter] that evaluates each row across all the predicates once.
+* [Vector Filter][vfilter] that evaluates each filter across the entire vector and adjusts the subsequent evaluation.
+
+While a row based filter is easier to code, it is much [slower][rowvvector] to process. We also see a significant
+[performance gain][rowvvector] in the absence of normalization.
+
+The builder for search argument should allow skipping normalization during the [build][build]. This has already been
+proposed as part of [HIVE-24458][HIVE-24458].
+
+### Read <a id="Read"></a>
+
+The read process has the following changes:
+
+```text
+                         │
+                         │
+                         │
+┌────────────────────────▼────────────────────────┐
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃Plan ++Search++ ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                 Read   │Stripe                  │
+└────────────────────────┼────────────────────────┘
+                         │
+                         ▼
+
+
+                         │
+                         │
+┌────────────────────────▼────────────────────────┐
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃Read ++Search++ ┃                │
+│               ┃    Columns     ┃◀─────────┐     │
+│               ┗━━━━━━━━━━━━━━━━┛          │     │
+│                        │              Size = 0  │
+│                        ▼                  │     │
+│               ┏━━━━━━━━━━━━━━━━┓          │     │
+│               ┃  Apply Filter  ┃──────────┘     │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                    Size > 0                     │
+│                        │                        │
+│                        ▼                        │
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃  Plan Select   ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                        │                        │
+│                        ▼                        │
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃  Read Select   ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                   Next │Batch                   │
+└────────────────────────┼────────────────────────┘
+                         │
+                         ▼
+```
+
+The read process changes:
+
+* **Read Stripe** used to plan the read of all (search + select) columns. This is enhanced to plan and fetch only the
+  search columns. The rest of the stripe planning process optimizations remain unchanged e.g. partial read planning of
+  the stripe based on RowGroup statistics.
+* **Next Batch** identifies the processing that takes place when `RecordReader.nextBatch` is invoked.
+  * **Read Search Columns** takes place instead of reading all the selected columns. This is in sync with the planning
+    that has taken place during **Read Stripe** where only the search columns have been planned.
+  * **Apply Filter** on the batch that at this point only includes search columns. Evaluate the result of the filter:
+    * **Size = 0** indicates all records have been filtered out. Given this we proceed to the next batch of search
+      columns.
+    * **Size > 0** indicates that at least one record accepted by the filter. This record needs to be substantiated with
+      other columns.
+  * **Plan Select Columns** is invoked to perform read of the select columns. The planning happens as follows:
+    * Determine the current position of the read within the stripe and plan the read for the select columns from this
+      point forward to the end of the stripe.
+    * The Read planning of select columns respects the row groups filtered out as a result of the stripe planning.
+    * Fetch the select columns using the above plan.
+  * **Read Select Columns** into the vectorized row batch
+  * Return this batch.
+
+The current implementation performs a single read for the select columns in a stripe.
+
+```text
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │RG0 │ │RG1 │ │RG2■│ │RG3 │ │RG4 │ │RG5■│ │RG6 │ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│                      Stripe                      │
+└──────────────────────────────────────────────────┘
+```
+
+The above diagram depicts a stripe with 7 Row Groups out of which **RG2** and **RG5** are selected by the filter. The
+current implementation does the following:
+
+* Start the read planning process from the first match RG2
+* Read to the end of the stripe that includes RG6
+* Based on the above fetch skips RG0 and RG1 subject to compression block boundaries
+
+The above logic could be enhanced to perform say **2 or n** reads before reading to the end of stripe. The current
+implementation allows 0 reads before reading to the end of the stripe. The value of **n** could be configurable but
+should avoid too many short reads.
+
+The read behavior changes as follows with multiple reads being allowed within a stripe for select columns:
+
+```text
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │    │ │    │ │■■■■│ │■■■■│ │■■■■│ │■■■■│ │■■■■│ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│              Current implementation              │
+└──────────────────────────────────────────────────┘
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │    │ │    │ │■■■■│ │    │ │    │ │■■■■│ │■■■■│ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│               Allow 1 partial read               │
+└──────────────────────────────────────────────────┘
+```
+
+The figure shows that we could read significantly fewer bytes by performing an additional read before reading to the end
+of stripe. This shall be included as a subsequent enhancement to this patch.
+
+## Configuration <a id="Configuration"></a>
+
+The following configuration options are exposed that control the filter behavior:
+
+|Property                   |Type   |Default|
+|:---                       |:---   |:---   |
+|orc.sarg.to.filter         |boolean|false  |
+|orc.sarg.to.filter.selected|boolean|false  |
+
+* `orc.sarg.to.filter` can be used to turn off the SArg to filter conversion. This might be particularly relevant in 
+  cases where the filter is expensive and does not eliminate a lot of records.
+* `orc.sarg.to.filter.selected` is an important setting that if incorrectly enabled results in wrong output. The 
+  `VectorizedRowBatch` has a selected vector that defines which rows are selected. This property should be set to `true`
+  only if the consumer respects the selected vector in determining the valid rows.
+
+## Tests <a id="Tests"></a>
+
+We evaluated this patch against a search job with the following stats:
+
+* Table
+  * Size: ~**420 TB**
+  * Data fields: ~**120**
+  * Partition fields: **3**
+* Scan
+  * Search fields: 3 data fields with large (~ 1000 value) IN clauses compounded by **OR**.
+  * Select fields: 16 data fields (includes the 3 search fields), 1 partition field
+  * Search:
+    * Size: ~**180 TB**
+    * Records: **3.99 T**
+  * Selected:
+    * Size: ~**100 MB**
+    * Records: **1 M**
+
+We have observed the following reductions:
+
+|Test    |IO Reduction %|CPU Reduction %|
+|:---    |          ---:|           ---:|
+|Same    |            45|             47|
+|SELECT *|            70|             87|
+
+* The savings are more significant as you increase the number of select columns with respect to the search columns
+* When the filter selects most data, no significant penalty observed as a result of 2 IO compared with a single IO
+  * We do have a penalty as a result of the filter application on the selected records.
+
+## Appendix <a id="Appendix"></a>
+
+### Benchmarks <a id="Benchmarks"></a>
+
+#### Row vs Vector <a id="RowvsVector"></a>
+
+We start with a decision of using a Row filter vs a Vector filter. The Row filter has the advantage of simpler code vs
+the Vector filter.
+
+```bash
+java -jar target/orc-benchmarks-core-1.7.0-SNAPSHOT-uber.jar filter simple
+```
+
+|Benchmark               |(fInSize)|(fType)|Mode| Cnt| Score|Error  |Units|
+|:---                    |     ---:|:---   |:---|---:|  ---:|:---   |:--- |
+|SimpleFilterBench.filter|        4|row    |avgt|  20|52.260|± 0.109|us/op|
+|SimpleFilterBench.filter|        4|vector |avgt|  20|19.699|± 0.044|us/op|
+|SimpleFilterBench.filter|        8|row    |avgt|  20|59.648|± 0.179|us/op|
+|SimpleFilterBench.filter|        8|vector |avgt|  20|28.655|± 0.036|us/op|
+|SimpleFilterBench.filter|       16|row    |avgt|  20|56.480|± 0.190|us/op|
+|SimpleFilterBench.filter|       16|vector |avgt|  20|46.757|± 0.124|us/op|
+|SimpleFilterBench.filter|       32|row    |avgt|  20|57.780|± 0.111|us/op|
+|SimpleFilterBench.filter|       32|vector |avgt|  20|52.060|± 0.333|us/op|
+|SimpleFilterBench.filter|      256|row    |avgt|  20|50.898|± 0.275|us/op|
+|SimpleFilterBench.filter|      256|vector |avgt|  20|85.684|± 0.351|us/op|
+
+Explanation:
+
+* **fInSize** calls out the number of values in the IN clause.
+* **fType** calls out the whether the filter is a row based filter, or a vector based filter.
+
+Observations:
+
+* The vector based filter is significantly faster than the row based filter.
+  * At best, vector was faster by **59.62%**
+  * At worst, vector was faster by **32.14%**
+* The performance of the filters is deteriorates with the increase of the IN values, however even in this case the
+  vector filter is much better than the row filter.
+
+In the next test we use a complex filter with both AND, and OR to understand the impact of Conjunctive Normal Form on

Review comment:
       * evaluation happens on every VectorBatch (1024 rows by default) 

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -83,17 +85,21 @@
   private final boolean[] fileIncluded;
   private final long rowIndexStride;
   private long rowInStripe = 0;
+  private long followRowInStripe = 0;
   private int currentStripe = -1;
   private long rowBaseInStripe = 0;
   private long rowCountInStripe = 0;
   private final BatchReader reader;
   private final OrcIndex indexes;
+  private final boolean[] rowIndexCols;
   private final SargApplier sargApp;
   // an array about which row groups aren't skipped
   private boolean[] includedRowGroups = null;
   private final DataReader dataReader;
   private final int maxDiskRangeChunkLimit;
   private final StripePlanner planner;
+  private final ReadLevel readLevel;
+  private boolean needFollowStripe;

Review comment:
       have been read or need to be read?

##########
File path: site/develop/design/lazy_filter.md
##########
@@ -0,0 +1,352 @@
+* [Lazy Filter](#LazyFilter)
+  * [Background](#Background)
+  * [Design](#Design)
+    * [SArg to Filter](#SArgtoFilter)
+    * [Read](#Read)
+  * [Configuration](#Configuration)
+  * [Tests](#Tests)
+  * [Appendix](#Appendix)
+    * [Benchmarks](#Benchmarks)
+      * [Row vs Vector](#RowvsVector)
+      * [Filter](#Filter)
+
+# Lazy Filter <a id="LazyFilter"></a>
+
+## Background <a id="Background"></a>
+
+This feature request started as a result of a large search that is performed with the following characteristics:
+
+* The search fields are not part of partition, bucket or sort fields.
+* The table is a very large table.
+* The predicates result in very few rows compared to the scan size.
+* The search columns are a significant subset of selection columns in the query.
+
+Initial analysis showed that we could have a significant benefit by lazily reading the non-search columns only when we
+have a match. We explore the design and some benchmarks in subsequent sections.
+
+## Design <a id="Design"></a>
+
+This builds further on [ORC-577][ORC-577] which currently only restricts deserialization for some selected data types
+but does not improve on IO.
+
+On a high level the design includes the following components:
+
+```text
+┌──────────────┐          ┌────────────────────────┐
+│              │          │          Read          │
+│              │          │                        │
+│              │          │     ┌────────────┐     │
+│SArg to Filter│─────────▶│     │Read Filter │     │
+│              │          │     │  Columns   │     │
+│              │          │     └────────────┘     │
+│              │          │            │           │
+└──────────────┘          │            ▼           │
+                          │     ┌────────────┐     │
+                          │     │Apply Filter│     │
+                          │     └────────────┘     │
+                          │            │           │
+                          │            ▼           │
+                          │     ┌────────────┐     │
+                          │     │Read Select │     │
+                          │     │  Columns   │     │
+                          │     └────────────┘     │
+                          │                        │
+                          │                        │
+                          └────────────────────────┘
+```
+
+* **SArg to Filter**: Converts Search Arguments passed down into filters for efficient application during scans.
+* **Read**: Performs the lazy read using the filters.
+  * **Read Filter Columns**: Read the filter columns from the file.
+  * **Apply Filter**: Apply the filter on the read filter columns.
+  * **Read Select Columns**: If filter selects at least a row then read the remaining columns.
+
+### SArg to Filter <a id="SArgtoFilter"></a>
+
+SArg to Filter converts the passed SArg into a filter. This enables automatic compatibility with both Spark and Hive as
+they already push down Search Arguments down to ORC.
+
+The SArg is automatically converted into a [Vector Filter][vfilter]. Which is applied during the read process. Two
+filter types were evaluated:
+
+* [Row Filter][rfilter] that evaluates each row across all the predicates once.
+* [Vector Filter][vfilter] that evaluates each filter across the entire vector and adjusts the subsequent evaluation.
+
+While a row based filter is easier to code, it is much [slower][rowvvector] to process. We also see a significant
+[performance gain][rowvvector] in the absence of normalization.
+
+The builder for search argument should allow skipping normalization during the [build][build]. This has already been
+proposed as part of [HIVE-24458][HIVE-24458].
+
+### Read <a id="Read"></a>
+
+The read process has the following changes:
+
+```text
+                         │
+                         │
+                         │
+┌────────────────────────▼────────────────────────┐
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃Plan ++Search++ ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                 Read   │Stripe                  │
+└────────────────────────┼────────────────────────┘
+                         │
+                         ▼
+
+
+                         │
+                         │
+┌────────────────────────▼────────────────────────┐
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃Read ++Search++ ┃                │
+│               ┃    Columns     ┃◀─────────┐     │
+│               ┗━━━━━━━━━━━━━━━━┛          │     │
+│                        │              Size = 0  │
+│                        ▼                  │     │
+│               ┏━━━━━━━━━━━━━━━━┓          │     │
+│               ┃  Apply Filter  ┃──────────┘     │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                    Size > 0                     │
+│                        │                        │
+│                        ▼                        │
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃  Plan Select   ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                        │                        │
+│                        ▼                        │
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃  Read Select   ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                   Next │Batch                   │
+└────────────────────────┼────────────────────────┘
+                         │
+                         ▼
+```
+
+The read process changes:
+
+* **Read Stripe** used to plan the read of all (search + select) columns. This is enhanced to plan and fetch only the
+  search columns. The rest of the stripe planning process optimizations remain unchanged e.g. partial read planning of
+  the stripe based on RowGroup statistics.
+* **Next Batch** identifies the processing that takes place when `RecordReader.nextBatch` is invoked.
+  * **Read Search Columns** takes place instead of reading all the selected columns. This is in sync with the planning
+    that has taken place during **Read Stripe** where only the search columns have been planned.
+  * **Apply Filter** on the batch that at this point only includes search columns. Evaluate the result of the filter:
+    * **Size = 0** indicates all records have been filtered out. Given this we proceed to the next batch of search
+      columns.
+    * **Size > 0** indicates that at least one record accepted by the filter. This record needs to be substantiated with
+      other columns.
+  * **Plan Select Columns** is invoked to perform read of the select columns. The planning happens as follows:
+    * Determine the current position of the read within the stripe and plan the read for the select columns from this
+      point forward to the end of the stripe.
+    * The Read planning of select columns respects the row groups filtered out as a result of the stripe planning.
+    * Fetch the select columns using the above plan.
+  * **Read Select Columns** into the vectorized row batch
+  * Return this batch.
+
+The current implementation performs a single read for the select columns in a stripe.
+
+```text
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │RG0 │ │RG1 │ │RG2■│ │RG3 │ │RG4 │ │RG5■│ │RG6 │ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│                      Stripe                      │
+└──────────────────────────────────────────────────┘
+```
+
+The above diagram depicts a stripe with 7 Row Groups out of which **RG2** and **RG5** are selected by the filter. The
+current implementation does the following:
+
+* Start the read planning process from the first match RG2
+* Read to the end of the stripe that includes RG6
+* Based on the above fetch skips RG0 and RG1 subject to compression block boundaries
+
+The above logic could be enhanced to perform say **2 or n** reads before reading to the end of stripe. The current
+implementation allows 0 reads before reading to the end of the stripe. The value of **n** could be configurable but
+should avoid too many short reads.
+
+The read behavior changes as follows with multiple reads being allowed within a stripe for select columns:
+
+```text
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │    │ │    │ │■■■■│ │■■■■│ │■■■■│ │■■■■│ │■■■■│ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│              Current implementation              │
+└──────────────────────────────────────────────────┘
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │    │ │    │ │■■■■│ │    │ │    │ │■■■■│ │■■■■│ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│               Allow 1 partial read               │
+└──────────────────────────────────────────────────┘
+```
+
+The figure shows that we could read significantly fewer bytes by performing an additional read before reading to the end

Review comment:
       Is partial read conf part of ORC-743? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org