You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/07/20 09:32:47 UTC

[hive] branch master updated: HIVE-23840: Use LLAP to get orc metadata (Peter Vary reviewed by Adam Szita)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new fa086ec  HIVE-23840: Use LLAP to get orc metadata (Peter Vary reviewed by Adam Szita)
fa086ec is described below

commit fa086ecce543384993a61d5154b14fa3b80df3b5
Author: pvary <pv...@cloudera.com>
AuthorDate: Mon Jul 20 11:32:29 2020 +0200

    HIVE-23840: Use LLAP to get orc metadata (Peter Vary reviewed by Adam Szita)
    
    Closes (#1251)
---
 .../hive/llap/io/api/impl/LlapRecordReader.java    |   2 +-
 .../org/apache/hadoop/hive/llap/LlapHiveUtils.java |   7 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    |   5 +
 .../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java  |   4 +
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java    |  29 ++-
 .../org/apache/hadoop/hive/ql/io/orc/OrcSplit.java |   3 +-
 .../ql/io/orc/VectorizedOrcAcidRowBatchReader.java | 255 ++++++++++++++-------
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java     |  47 ++++
 .../orc/TestVectorizedOrcAcidRowBatchReader.java   |  63 ++++-
 9 files changed, 321 insertions(+), 94 deletions(-)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 417a42a..c148dd4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -191,7 +191,7 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
     if (isAcidScan) {
       OrcSplit orcSplit = (OrcSplit) split;
       this.acidReader = new VectorizedOrcAcidRowBatchReader(
-          orcSplit, jobConf, Reporter.NULL, null, rbCtx, true);
+          orcSplit, jobConf, Reporter.NULL, null, rbCtx, true, mapWork);
       isAcidFormat = !orcSplit.isOriginal();
     } else {
       isAcidFormat = false;
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
index 2779969..88834ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -105,7 +106,7 @@ public final class LlapHiveUtils {
    * @return the MapWork instance. Might be null if missing.
    * @throws HiveException
    */
-  public static MapWork findMapWork(JobConf job) throws HiveException {
+  public static MapWork findMapWork(JobConf job) {
     String inputName = job.get(Utilities.INPUT_NAME, null);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Initializing for input " + inputName);
@@ -142,8 +143,8 @@ public final class LlapHiveUtils {
     }
   }
 
-  public static boolean isLlapMode(HiveConf conf) {
-    return "llap".equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+  public static boolean isLlapMode(Configuration conf) {
+    return "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
   }
 
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 85d8531..add20ec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -505,15 +505,18 @@ public class AcidUtils {
     if (ORIGINAL_PATTERN.matcher(filename).matches() || ORIGINAL_PATTERN_COPY.matcher(filename).matches()) {
       long minWriteId = 0;
       long maxWriteId = 0;
+      int statementId = -1;
       if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
         ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), false);
         minWriteId = parsedDelta.getMinWriteId();
         maxWriteId = parsedDelta.getMaxWriteId();
+        statementId = parsedDelta.getStatementId();
       }
       result
           .setOldStyle(true)
           .minimumWriteId(minWriteId)
           .maximumWriteId(maxWriteId)
+          .statementId(statementId)
           .bucket(bucket)
           .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX));
     }
@@ -531,6 +534,7 @@ public class AcidUtils {
             .setOldStyle(false)
             .minimumWriteId(parsedDelta.minWriteId)
             .maximumWriteId(parsedDelta.maxWriteId)
+            .statementId(parsedDelta.statementId)
             .bucket(bucket)
             .attemptId(attemptId);
       } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
@@ -539,6 +543,7 @@ public class AcidUtils {
             .setOldStyle(false)
             .minimumWriteId(parsedDelta.minWriteId)
             .maximumWriteId(parsedDelta.maxWriteId)
+            .statementId(parsedDelta.statementId)
             .bucket(bucket);
       }
     } else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 6739a2a..ee31e6b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -695,6 +695,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       return Objects.hash(minKey, maxKey);
     }
 
+    public boolean isIntersects(KeyInterval other) {
+      return (minKey == null || other.maxKey == null || minKey.compareTo(other.maxKey) <= 0) &&
+          (maxKey == null || other.minKey == null || maxKey.compareTo(other.minKey) >= 0);
+    }
   }
   /**
    * Find the key range for original bucket files.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index d0c8006..6477c96 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -51,9 +51,11 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.orc.OrcConf;
+import org.apache.orc.OrcProto;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.impl.AcidStats;
 import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.orc.impl.OrcTail;
 import org.apache.orc.impl.SchemaEvolution;
 import org.apache.orc.impl.WriterImpl;
 import org.slf4j.Logger;
@@ -648,15 +650,28 @@ public class OrcRecordUpdater implements RecordUpdater {
   }
 
   static RecordIdentifier[] parseKeyIndex(Reader reader) {
-    String[] stripes;
-    try {
-      if (!reader.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)) {
-        return null;
+    if (!reader.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)) {
+      return null;
+    }
+
+    ByteBuffer val =
+        reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)
+            .duplicate();
+    return parseKeyIndex(val);
+  }
+
+  static RecordIdentifier[] parseKeyIndex(OrcTail orcTail) {
+    for(OrcProto.UserMetadataItem item: orcTail.getFooter().getMetadataList()) {
+      if (item.hasName() && item.getName().equals(OrcRecordUpdater.ACID_KEY_INDEX_NAME)) {
+        return parseKeyIndex(item.getValue().asReadOnlyByteBuffer().duplicate());
       }
+    }
+    return null;
+  }
 
-      ByteBuffer val =
-          reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)
-              .duplicate();
+  private static RecordIdentifier[] parseKeyIndex(ByteBuffer val) {
+    String[] stripes;
+    try {
       CharsetDecoder utf8Decoder = UTF8.newDecoder();
       stripes = utf8Decoder.decode(val).toString().split(";");
     } catch (CharacterCodingException e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index e71dc7d..25da7a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -346,8 +346,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
         OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(getPath(), rootPath, conf);
     writeId = tmd.syntheticWriteId;
     stmtId = tmd.statementId;
-    AcidOutputFormat.Options opt = AcidUtils.parseBaseOrDeltaBucketFilename(getPath(), conf);
-    bucketId = opt.getBucketId();
+    bucketId = AcidUtils.parseBucketId(getPath());
   }
 
   @Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 832ce19..d0cc983 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -25,8 +25,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.IllegalCacheConfigurationException;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -42,6 +46,8 @@ import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
@@ -50,9 +56,10 @@ import org.apache.hive.common.util.Ref;
 import org.apache.orc.ColumnStatistics;
 import org.apache.orc.IntegerColumnStatistics;
 import org.apache.orc.OrcConf;
+import org.apache.orc.OrcProto;
 import org.apache.orc.StripeInformation;
-import org.apache.orc.StripeStatistics;
-import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.orc.impl.ColumnStatisticsImpl;
+import org.apache.orc.impl.OrcTail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,6 +136,16 @@ public class VectorizedOrcAcidRowBatchReader
    */
   private SearchArgument deleteEventSarg = null;
 
+  /**
+   * Cachetag associated with the Split
+   */
+  private final CacheTag cacheTag;
+
+  /**
+   * Skip using Llap IO cache for checking delete_delta files if the configuration is not correct
+   */
+  private static boolean skipLlapCache = false;
+
   //OrcInputFormat c'tor
   VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf,
                                   Reporter reporter) throws IOException {
@@ -138,7 +155,7 @@ public class VectorizedOrcAcidRowBatchReader
   VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf,
         Reporter reporter, VectorizedRowBatchCtx rbCtx) throws IOException {
     this(conf, inputSplit, reporter,
-        rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx, false);
+        rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx, false, null);
 
     final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, inputSplit);
     // Careful with the range here now, we do not want to read the whole base file like deltas.
@@ -189,15 +206,15 @@ public class VectorizedOrcAcidRowBatchReader
    */
   public VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, Reporter reporter,
     org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> baseReader,
-    VectorizedRowBatchCtx rbCtx, boolean isFlatPayload) throws IOException {
-    this(conf, inputSplit, reporter, rbCtx, isFlatPayload);
+    VectorizedRowBatchCtx rbCtx, boolean isFlatPayload, MapWork mapWork) throws IOException {
+    this(conf, inputSplit, reporter, rbCtx, isFlatPayload, mapWork);
     if (baseReader != null) {
       setBaseAndInnerReader(baseReader);
     }
   }
 
   private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporter reporter,
-      VectorizedRowBatchCtx rowBatchCtx, boolean isFlatPayload) throws IOException {
+      VectorizedRowBatchCtx rowBatchCtx, boolean isFlatPayload, MapWork mapWork) throws IOException {
     this.isFlatPayload = isFlatPayload;
     this.rbCtx = rowBatchCtx;
     final boolean isAcidRead = AcidUtils.isFullAcidScan(conf);
@@ -232,6 +249,19 @@ public class VectorizedOrcAcidRowBatchReader
 
     this.syntheticProps = orcSplit.getSyntheticAcidProps();
 
+    if (LlapHiveUtils.isLlapMode(conf) && LlapProxy.isDaemon()
+            && HiveConf.getBoolVar(conf, ConfVars.LLAP_TRACK_CACHE_USAGE))
+    {
+      if (mapWork == null) {
+        mapWork = LlapHiveUtils.findMapWork(conf);
+      }
+      PartitionDesc partitionDesc =
+          LlapHiveUtils.partitionDescForPath(orcSplit.getPath(), mapWork.getPathToPartitionInfo());
+      cacheTag = LlapHiveUtils.getDbAndTableNameForMetrics(orcSplit.getPath(), true, partitionDesc);
+    } else {
+      cacheTag = null;
+    }
+
     // Clone readerOptions for deleteEvents.
     Reader.Options deleteEventReaderOptions = readerOptions.clone();
     // Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because
@@ -244,7 +274,7 @@ public class VectorizedOrcAcidRowBatchReader
       // See if we can load all the relevant delete events from all the
       // delete deltas in memory...
       der = new ColumnizedDeleteEventRegistry(conf, orcSplit,
-          deleteEventReaderOptions, keyInterval);
+          deleteEventReaderOptions, keyInterval, cacheTag);
     } catch (DeleteEventsOverflowMemoryException e) {
       // If not, then create a set of hanging readers that do sort-merge to find the next smallest
       // delete event on-demand. Caps the memory consumption to (some_const * no. of readers).
@@ -403,11 +433,7 @@ public class VectorizedOrcAcidRowBatchReader
       return new OrcRawRecordMerger.KeyInterval(null, null);
     }
 
-    //todo: since we already have OrcSplit.orcTail, should somehow use it to
-    // get the acid.index, stats, etc rather than fetching the footer again
-    // though it seems that orcTail is mostly null....
-    Reader reader = OrcFile.createReader(orcSplit.getPath(),
-        OrcFile.readerOptions(conf));
+    OrcTail orcTail = getOrcTail(orcSplit.getPath(), conf, cacheTag).orcTail;
 
     if(orcSplit.isOriginal()) {
       /**
@@ -421,10 +447,10 @@ public class VectorizedOrcAcidRowBatchReader
        * Reader.Options, Configuration, OrcRawRecordMerger.Options)}*/
       LOG.debug("findMinMaxKeys(original split)");
 
-      return findOriginalMinMaxKeys(orcSplit, reader, deleteEventReaderOptions);
+      return findOriginalMinMaxKeys(orcSplit, orcTail, deleteEventReaderOptions);
     }
 
-    List<StripeInformation> stripes = reader.getStripes();
+    List<StripeInformation> stripes = orcTail.getStripes();
     final long splitStart = orcSplit.getStart();
     final long splitEnd = splitStart + orcSplit.getLength();
     int firstStripeIndex = -1;
@@ -480,7 +506,7 @@ public class VectorizedOrcAcidRowBatchReader
           lastStripeIndex + ")");
       return new OrcRawRecordMerger.KeyInterval(null, null);
     }
-    RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader);
+    RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(orcTail);
 
     if(keyIndex == null) {
       LOG.warn("Could not find keyIndex (" + firstStripeIndex + "," +
@@ -500,12 +526,12 @@ public class VectorizedOrcAcidRowBatchReader
      * are actually computed.  Streaming ingest used to set it 0 and Minor
      * compaction so there are lots of legacy files with no (rather, bad)
      * column stats*/
-    boolean columnStatsPresent = reader.getRowIndexStride() > 0;
+    boolean columnStatsPresent = orcTail.getFooter().getRowIndexStride() > 0;
     if(!columnStatsPresent) {
       LOG.debug("findMinMaxKeys() No ORC column stats");
     }
 
-    List<StripeStatistics> stats = reader.getStripeStatistics();
+    List<OrcProto.StripeStatistics> stats = orcTail.getStripeStatisticsProto();
     assert stripes.size() == stats.size() : "str.s=" + stripes.size() +
         " sta.s=" + stats.size();
 
@@ -518,29 +544,7 @@ public class VectorizedOrcAcidRowBatchReader
     }
     else {
       if(columnStatsPresent) {
-        ColumnStatistics[] colStats =
-            stats.get(firstStripeIndex).getColumnStatistics();
-        /*
-        Structure in data is like this:
-         <op, owid, writerId, rowid, cwid, <f1, ... fn>>
-        The +1 is to account for the top level struct which has a
-        ColumnStatistics object in colsStats.  Top level struct is normally
-        dropped by the Reader (I guess because of orc.impl.SchemaEvolution)
-        */
-        IntegerColumnStatistics origWriteId = (IntegerColumnStatistics)
-            colStats[OrcRecordUpdater.ORIGINAL_WRITEID + 1];
-        IntegerColumnStatistics bucketProperty = (IntegerColumnStatistics)
-            colStats[OrcRecordUpdater.BUCKET + 1];
-        IntegerColumnStatistics rowId = (IntegerColumnStatistics)
-            colStats[OrcRecordUpdater.ROW_ID + 1];
-        //we may want to change bucketProperty from int to long in the
-        // future(across the stack) this protects the following cast to int
-        assert bucketProperty.getMinimum() <= Integer.MAX_VALUE :
-            "was bucketProperty changed to a long (" +
-                bucketProperty.getMinimum() + ")?!:" + orcSplit;
-        //this a lower bound but not necessarily greatest lower bound
-        minKey = new RecordIdentifier(origWriteId.getMinimum(),
-            (int) bucketProperty.getMinimum(), rowId.getMinimum());
+        minKey = getKeyInterval(stats.get(firstStripeIndex).getColStatsList()).getMinKey();
       }
     }
 
@@ -550,22 +554,7 @@ public class VectorizedOrcAcidRowBatchReader
       maxKey = keyIndex[lastStripeIndex];
     } else {
       if(columnStatsPresent) {
-        ColumnStatistics[] colStats =
-            stats.get(lastStripeIndex).getColumnStatistics();
-        IntegerColumnStatistics origWriteId = (IntegerColumnStatistics)
-            colStats[OrcRecordUpdater.ORIGINAL_WRITEID + 1];
-        IntegerColumnStatistics bucketProperty = (IntegerColumnStatistics)
-            colStats[OrcRecordUpdater.BUCKET + 1];
-        IntegerColumnStatistics rowId = (IntegerColumnStatistics)
-            colStats[OrcRecordUpdater.ROW_ID + 1];
-
-        assert bucketProperty.getMaximum() <= Integer.MAX_VALUE :
-            "was bucketProperty changed to a long (" +
-                bucketProperty.getMaximum() + ")?!:" + orcSplit;
-
-        // this is an upper bound but not necessarily the least upper bound
-        maxKey = new RecordIdentifier(origWriteId.getMaximum(),
-            (int) bucketProperty.getMaximum(), rowId.getMaximum());
+        maxKey = getKeyInterval(stats.get(firstStripeIndex).getColStatsList()).getMaxKey();
       }
     }
     OrcRawRecordMerger.KeyInterval keyInterval =
@@ -593,23 +582,18 @@ public class VectorizedOrcAcidRowBatchReader
        * writeId is the same in both cases
        */
       for(int i = firstStripeIndex; i <= lastStripeIndex; i++) {
-        ColumnStatistics[] colStats = stats.get(firstStripeIndex)
-            .getColumnStatistics();
-        IntegerColumnStatistics bucketProperty = (IntegerColumnStatistics)
-            colStats[OrcRecordUpdater.BUCKET + 1];
-        IntegerColumnStatistics rowId = (IntegerColumnStatistics)
-            colStats[OrcRecordUpdater.ROW_ID + 1];
-        if(bucketProperty.getMinimum() < minBucketProp) {
-          minBucketProp = bucketProperty.getMinimum();
+        OrcRawRecordMerger.KeyInterval key = getKeyInterval(stats.get(i).getColStatsList());
+        if(key.getMinKey().getBucketProperty() < minBucketProp) {
+          minBucketProp = key.getMinKey().getBucketProperty();
         }
-        if(bucketProperty.getMaximum() > maxBucketProp) {
-          maxBucketProp = bucketProperty.getMaximum();
+        if(key.getMaxKey().getBucketProperty() > maxBucketProp) {
+          maxBucketProp = key.getMaxKey().getBucketProperty();
         }
-        if(rowId.getMinimum() < minRowId) {
-          minRowId = rowId.getMinimum();
+        if(key.getMinKey().getRowId() < minRowId) {
+          minRowId = key.getMinKey().getRowId();
         }
-        if(rowId.getMaximum() > maxRowId) {
-          maxRowId = rowId.getMaximum();
+        if(key.getMaxKey().getRowId() > maxRowId) {
+          maxRowId = key.getMaxKey().getRowId();
         }
       }
     }
@@ -623,7 +607,7 @@ public class VectorizedOrcAcidRowBatchReader
     return keyInterval;
   }
 
-  private OrcRawRecordMerger.KeyInterval findOriginalMinMaxKeys(OrcSplit orcSplit, Reader reader,
+  private OrcRawRecordMerger.KeyInterval findOriginalMinMaxKeys(OrcSplit orcSplit, OrcTail orcTail,
       Reader.Options deleteEventReaderOptions) {
 
     // This method returns the minimum and maximum synthetic row ids that are present in this split
@@ -641,7 +625,7 @@ public class VectorizedOrcAcidRowBatchReader
     long minRowId = syntheticProps.getRowIdOffset();
     long maxRowId = syntheticProps.getRowIdOffset();
 
-    for(StripeInformation stripe: reader.getStripes()) {
+    for(StripeInformation stripe: orcTail.getStripes()) {
       if (splitStart > stripe.getOffset()) {
         // This stripe starts before the current split starts. This stripe is not included in this split.
         minRowId += stripe.getNumberOfRows();
@@ -685,6 +669,36 @@ public class VectorizedOrcAcidRowBatchReader
     return keyIntervalTmp;
   }
 
+  private static class ReaderData {
+    OrcTail orcTail;
+    Reader reader;
+  }
+
+  /**
+   * Gets the OrcTail from cache if LLAP IO is enabled, otherwise creates the reader to get the tail.
+   * If reader is created return that as well so we do not have recreate it if needed.
+   * @param path The Orc file path we want to get the OrcTail for
+   * @param conf The Configuration to access LLAP
+   * @param cacheTag The cacheTag needed to get OrcTail from LLAP IO cache
+   * @return ReaderData object where the orcTail is not null. Reader can be null, but if we had to create
+   * one we return that as well for further reuse.
+   */
+  private static ReaderData getOrcTail(Path path, Configuration conf, CacheTag cacheTag) throws IOException {
+    ReaderData readerData = new ReaderData();
+    if (!skipLlapCache && LlapHiveUtils.isLlapMode(conf) && LlapProxy.isDaemon()) {
+      try {
+        readerData.orcTail = LlapProxy.getIo().getOrcTailFromCache(path, conf, cacheTag);
+      } catch (IllegalCacheConfigurationException icce) {
+        LOG.warn("Cache is not usable. Please fix the configuration", icce);
+        skipLlapCache = true;
+      }
+    } else {
+      readerData.reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+      readerData.orcTail = new OrcTail(readerData.reader.getFileTail(), readerData.reader.getSerializedFileFooter());
+    }
+    return readerData;
+  }
+
   /**
    * See {@link #next(NullWritable, VectorizedRowBatch)} first and
    * {@link OrcRawRecordMerger.OriginalReaderPair}.
@@ -1543,7 +1557,7 @@ public class VectorizedOrcAcidRowBatchReader
 
     ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit,
         Reader.Options readerOptions,
-        OrcRawRecordMerger.KeyInterval keyInterval)
+        OrcRawRecordMerger.KeyInterval keyInterval, CacheTag cacheTag)
         throws IOException, DeleteEventsOverflowMemoryException {
       this.testMode = conf.getBoolean(ConfVars.HIVE_IN_TEST.varname, false);
       int bucket = AcidUtils.parseBucketId(orcSplit.getPath());
@@ -1562,20 +1576,31 @@ public class VectorizedOrcAcidRowBatchReader
       try {
         final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit);
         if (deleteDeltaDirs.length > 0) {
+          AcidOutputFormat.Options orcSplitMinMaxWriteIds =
+              AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf);
           int totalDeleteEventCount = 0;
           for (Path deleteDeltaDir : deleteDeltaDirs) {
-            FileSystem fs = deleteDeltaDir.getFileSystem(conf);
+            if (!isQualifiedDeleteDeltaForSplit(orcSplitMinMaxWriteIds, deleteDeltaDir)) {
+              continue;
+            }
             Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket,
                 new OrcRawRecordMerger.Options().isCompacting(false), null);
             for (Path deleteDeltaFile : deleteDeltaFiles) {
               try {
-                /**
-                 * todo: we have OrcSplit.orcTail so we should be able to get stats from there
-                 */
-                Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile, OrcFile.readerOptions(conf));
-                if (deleteDeltaReader.getNumberOfRows() <= 0) {
+                ReaderData readerData = getOrcTail(deleteDeltaFile, conf, cacheTag);
+                OrcTail orcTail = readerData.orcTail;
+                if (orcTail.getFooter().getNumberOfRows() <= 0) {
                   continue; // just a safe check to ensure that we are not reading empty delete files.
                 }
+                OrcRawRecordMerger.KeyInterval deleteKeyInterval = findDeleteMinMaxKeys(orcTail, deleteDeltaFile);
+                if (!deleteKeyInterval.isIntersects(keyInterval)) {
+                  // If there is no intersection between data and delete delta, do not read delete file
+                  continue;
+                }
+                // Reader can be reused if it was created before for getting orcTail: mostly for non-LLAP cache cases.
+                // For LLAP cases we need to create it here.
+                Reader deleteDeltaReader = readerData.reader != null ? readerData.reader :
+                    OrcFile.createReader(deleteDeltaFile, OrcFile.readerOptions(conf));
                 totalDeleteEventCount += deleteDeltaReader.getNumberOfRows();
                 DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
                     deleteDeltaFile, readerOptions, bucket, validWriteIdList, isBucketedTable, conf,
@@ -1601,6 +1626,48 @@ public class VectorizedOrcAcidRowBatchReader
         throw e; // rethrow the exception so that the caller can handle.
       }
     }
+
+    private static OrcRawRecordMerger.KeyInterval findDeleteMinMaxKeys(OrcTail orcTail, Path path) {
+      boolean columnStatsPresent = orcTail.getFooter().getRowIndexStride() > 0;
+      if(!columnStatsPresent) {
+        LOG.debug("findMinMaxKeys() No ORC column stats");
+        return new OrcRawRecordMerger.KeyInterval(null, null);
+      }
+
+      return getKeyInterval(orcTail.getFooter().getStatisticsList());
+    }
+
+    /**
+     * Check if the delete delta folder needs to be scanned for a given split's min/max write ids.
+     *
+     * @param orcSplitMinMaxWriteIds
+     * @param deleteDeltaDir
+     * @return true when  delete delta dir has to be scanned.
+     */
+    @VisibleForTesting
+    protected static boolean isQualifiedDeleteDeltaForSplit(AcidOutputFormat.Options orcSplitMinMaxWriteIds,
+        Path deleteDeltaDir)
+    {
+      AcidUtils.ParsedDelta deleteDelta = AcidUtils.parsedDelta(deleteDeltaDir, false);
+      // We allow equal writeIds so we are prepared for multi statement transactions.
+      // In this case we have to check the stmt id.
+      if (orcSplitMinMaxWriteIds.getMinimumWriteId() == deleteDelta.getMaxWriteId()) {
+        int orcSplitStmtId = orcSplitMinMaxWriteIds.getStatementId();
+        int deltaStmtId = deleteDelta.getStatementId();
+        // StatementId -1 and 0 is also used as the default one if it is not provided.
+        // Not brave enough to fix generally, so just fix here.
+        if (orcSplitStmtId == -1) {
+          orcSplitStmtId = 0;
+        }
+        if (deltaStmtId == -1) {
+          deltaStmtId = 0;
+        }
+        return orcSplitStmtId < deltaStmtId;
+      }
+      // For delta_0000012_0000014_0000, no need to read delete delta folders < 12.
+      return orcSplitMinMaxWriteIds.getMinimumWriteId() < deleteDelta.getMaxWriteId();
+    }
+
     private void checkSize(int index) throws DeleteEventsOverflowMemoryException {
       if(index > maxEventsInMemory) {
         //check to prevent OOM errors
@@ -1790,4 +1857,34 @@ public class VectorizedOrcAcidRowBatchReader
   SearchArgument getDeleteEventSarg() {
      return deleteEventSarg;
   }
+
+  private static IntegerColumnStatistics deserializeIntColumnStatistics(List<OrcProto.ColumnStatistics> colStats, int id) {
+    return (IntegerColumnStatistics) ColumnStatisticsImpl.deserialize(null, colStats.get(id));
+  }
+
+  /**
+   * Calculates the min/max record key.
+   * Structure in data is like this:
+   * <op, owid, writerId, rowid, cwid, <f1, ... fn>>
+   * The +1 is to account for the top level struct which has a
+   * ColumnStatistics object in colsStats.  Top level struct is normally
+   * dropped by the Reader (I guess because of orc.impl.SchemaEvolution)
+   * @param colStats The statistics array
+   * @return The min record key
+   */
+  private static OrcRawRecordMerger.KeyInterval getKeyInterval(List<OrcProto.ColumnStatistics> colStats) {
+    IntegerColumnStatistics origWriteId = deserializeIntColumnStatistics(colStats, OrcRecordUpdater.ORIGINAL_WRITEID + 1);
+    IntegerColumnStatistics bucketProperty = deserializeIntColumnStatistics(colStats, OrcRecordUpdater.BUCKET + 1);
+    IntegerColumnStatistics rowId = deserializeIntColumnStatistics(colStats, OrcRecordUpdater.ROW_ID + 1);
+
+    // We may want to change bucketProperty from int to long in the future(across the stack) this protects
+    // the following cast to int
+    assert bucketProperty.getMaximum() <= Integer.MAX_VALUE :
+        "was bucketProperty (max) changed to a long (" + bucketProperty.getMaximum() + ")?!";
+    assert bucketProperty.getMinimum() <= Integer.MAX_VALUE :
+        "was bucketProperty (min) changed to a long (" + bucketProperty.getMaximum() + ")?!";
+    RecordIdentifier maxKey = new RecordIdentifier(origWriteId.getMaximum(), (int) bucketProperty.getMaximum(), rowId.getMaximum());
+    RecordIdentifier minKey = new RecordIdentifier(origWriteId.getMinimum(), (int) bucketProperty.getMinimum(), rowId.getMinimum());
+    return new OrcRawRecordMerger.KeyInterval(minKey, maxKey);
+  }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index e6fae44..99d5fe6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -74,6 +74,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
@@ -115,6 +116,52 @@ public class TestOrcRawRecordMerger {
     assertEquals(false, left.equals(ri));
   }
 
+  @Test
+  public void testIntersect() {
+    OrcRawRecordMerger.KeyInterval ki1 = generateKeyInterval(1000L, 2000L);
+    OrcRawRecordMerger.KeyInterval ki2 = generateKeyInterval(1500L, 2500L);
+    checkIntersect(ki1, ki2, true);
+
+    ki2 = generateKeyInterval(500L, 1000L);
+    checkIntersect(ki1, ki2, true);
+
+    ki2 = generateKeyInterval(500L, 999L);
+    checkIntersect(ki1, ki2, false);
+
+    ki2 = generateKeyInterval(500L, null);
+    checkIntersect(ki1, ki2, true);
+
+    ki2 = generateKeyInterval(2500L, null);
+    checkIntersect(ki1, ki2, false);
+
+    ki2 = generateKeyInterval(null, null);
+    checkIntersect(ki1, ki2, true);
+  }
+
+  private static OrcRawRecordMerger.KeyInterval generateKeyInterval(Long minRowId, Long maxRowId) {
+    RecordIdentifier min = null;
+    if (minRowId != null) {
+      min = new RecordIdentifier(1, 100, minRowId);
+    }
+    RecordIdentifier max = null;
+    if (maxRowId != null) {
+      max = new RecordIdentifier(1, 100, maxRowId);
+    }
+    return new OrcRawRecordMerger.KeyInterval(min, max);
+  }
+
+  private static void checkIntersect(OrcRawRecordMerger.KeyInterval ki1, OrcRawRecordMerger.KeyInterval ki2,
+      boolean isIntersect)
+  {
+    if (isIntersect) {
+      assertTrue(ki1.isIntersects(ki2));
+      assertTrue(ki2.isIntersects(ki1));
+    } else {
+      assertFalse(ki1.isIntersects(ki2));
+      assertFalse(ki2.isIntersects(ki1));
+    }
+  }
+
   private static void setRow(OrcStruct event,
                              int operation,
                              long originalTransaction,
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index 6fe47d5..ee81cd3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.File;
-import java.util.ArrayList;
+import java.io.IOException;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Properties;
@@ -955,7 +955,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
         new RecordIdentifier(0, bucketProperty, 2), filterOn);
   }
 
-    @Test
+  @Test
   public void testVectorizedOrcAcidRowBatchReader() throws Exception {
     conf.set("bucket_count", "1");
       conf.set(ValidTxnList.VALID_TXNS_KEY,
@@ -1093,4 +1093,63 @@ public class TestVectorizedOrcAcidRowBatchReader {
         null, null, true);
 
   }
+
+  @Test
+  public void testIsQualifiedDeleteDeltaForSplit() throws IOException {
+    // Original file
+    checkPath("00000_0", "delete_delta_000012_000012_0000", true);
+    checkPath("00000_0", "delete_delta_000001_000001", true);
+
+    // Original copy
+    checkPath("00000_0_copy", "delete_delta_0000012_0000012_0000", true);
+    checkPath("00000_0_copy", "delete_delta_0000001_0000001", true);
+
+    // Base file
+    checkPath("base_00000002/bucket_0000001", "delete_delta_0000012_0000012_0000", true);
+
+    // Compacted base file
+    checkPath("base_0000002_v123/bucket_00000_0", "delete_delta_0000012_0000012_0000", true);
+
+    // Delta file
+    checkPath("delta_00000002_0000002/bucket_00001_1", "delete_delta_0000012_0000012_0000", true);
+    checkPath("delta_00000002_0000002/bucket_00001_1", "delete_delta_0000002_0000002", false);
+    checkPath("delta_00000002_0000002/bucket_00001_1", "delete_delta_0000001_0000001_0001", false);
+
+    // Delta with statement id
+    checkPath("delta_0000002_0000002_124/bucket_00001", "delete_delta_000012_000012_0000", true);
+    checkPath("delta_0000002_0000002_124/bucket_00001", "delete_delta_000002_000002", false);
+    checkPath("delta_0000002_0000002_124/bucket_00001", "delete_delta_000001_000001_0001", false);
+
+    // Delta file with data loaded by LOAD DATA command
+    checkPath("delta_0000002_0000002_0000/000000_0", "delete_delta_0000012_0000012_0000", true);
+    checkPath("delta_0000002_0000002_0000/000000_0", "delete_delta_0000002_0000002", false);
+    checkPath("delta_0000002_0000002_0000/000000_0", "delete_delta_0000001_0000001_0001", false);
+
+    // Compacted delta
+    checkPath("delta_0000002_0000005_124/bucket_00001", "delete_delta_0000012_0000012_0000", true);
+    checkPath("delta_0000002_0000005_124/bucket_00001", "delete_delta_0000003_0000003", true);
+    checkPath("delta_0000002_0000005_124/bucket_00001", "delete_delta_0000002_0000005", true);
+    checkPath("delta_0000002_0000005_124/bucket_00001", "delete_delta_0000002_0000002", false);
+    checkPath("delta_0000002_0000005_124/bucket_00001", "delete_delta_0000001_0000001_0001", false);
+
+    // Multi statement transaction check
+    checkPath("delta_0000002_0000002_0000/bucket_00001", "delete_delta_0000002_0000002_0000", false);
+    checkPath("delta_0000002_0000002_0001/bucket_00001", "delete_delta_0000002_0000002_0000", false);
+    checkPath("delta_0000002_0000002_0001/bucket_00001", "delete_delta_0000002_0000002_0002", true);
+    checkPath("delta_0000002_0000002_0001/bucket_00001", "delete_delta_0000002_0000002", false);
+    checkPath("delta_0000002_0000002/bucket_00001", "delete_delta_0000002_0000002", false);
+    checkPath("delta_0000002_0000002/bucket_00001", "delete_delta_0000002_0000002_0001", true);
+  }
+
+  private void checkPath(String splitPath, String deleteDeltaPath, boolean expected) throws IOException {
+    String tableDir = "";//hdfs://localhost:59316/base/warehouse/acid_test/";
+    AcidOutputFormat.Options ao = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(tableDir + splitPath), conf);
+    if (expected) {
+      assertTrue(VectorizedOrcAcidRowBatchReader.ColumnizedDeleteEventRegistry.isQualifiedDeleteDeltaForSplit(ao,
+          new Path(tableDir + deleteDeltaPath)));
+    } else {
+      assertFalse(VectorizedOrcAcidRowBatchReader.ColumnizedDeleteEventRegistry.isQualifiedDeleteDeltaForSplit(ao,
+          new Path(tableDir + deleteDeltaPath)));
+    }
+  }
 }