You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2021/11/08 13:12:17 UTC

[hive] branch master updated: HIVE-25651: Enable LLAP cache affinity for Iceberg ORC splits (Adam Szita, reviewed by Marton Bod and Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ed4049  HIVE-25651: Enable LLAP cache affinity for Iceberg ORC splits (Adam Szita, reviewed by Marton Bod and Peter Vary)
5ed4049 is described below

commit 5ed4049d5e1e325858d3fb45095529d46f9e5c0d
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Mon Nov 8 14:12:01 2021 +0100

    HIVE-25651: Enable LLAP cache affinity for Iceberg ORC splits (Adam Szita, reviewed by Marton Bod and Peter Vary)
---
 .../iceberg/mr/hive/HiveIcebergInputFormat.java    | 11 +++-
 .../apache/iceberg/mr/hive/HiveIcebergSplit.java   | 22 ++++++-
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   | 40 +++++++++---
 .../apache/iceberg/mr/TestIcebergInputFormats.java | 30 +++++++++
 .../positive/llap/vectorized_iceberg_read.q.out    |  4 +-
 .../hive/ql/exec/tez/HashableInputSplit.java       | 25 +++++++
 .../tez/HostAffinitySplitLocationProvider.java     | 40 ++++++------
 .../hive/ql/io/BucketizedHiveInputFormat.java      |  2 +-
 .../hadoop/hive/ql/io/CombineHiveInputFormat.java  |  2 +-
 .../apache/hadoop/hive/ql/io/HiveInputFormat.java  | 76 ++++++++++++----------
 .../ql/io/LlapCacheOnlyInputFormatInterface.java   |  7 ++
 .../org/apache/hadoop/hive/ql/plan/MapWork.java    |  3 +-
 .../tez/TestHostAffinitySplitLocationProvider.java |  7 +-
 13 files changed, 195 insertions(+), 74 deletions(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index fc66890..9d792e5 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -23,12 +23,15 @@ import java.io.IOException;
 import java.util.Arrays;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -55,7 +58,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
-    implements CombineHiveInputFormat.AvoidSplitCombination, VectorizedInputFormatInterface {
+    implements CombineHiveInputFormat.AvoidSplitCombination, VectorizedInputFormatInterface,
+    LlapCacheOnlyInputFormatInterface.VectorizedOnly {
 
   private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergInputFormat.class);
   private static final String HIVE_VECTORIZED_RECORDREADER_CLASS =
@@ -135,4 +139,9 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
     return new VectorizedSupport.Support[]{ VectorizedSupport.Support.DECIMAL_64 };
   }
 
+  @Override
+  public void injectCaches(FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf) {
+    // no-op for Iceberg
+  }
+
 }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
index a1e7b1a..a9827ff 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
@@ -19,17 +19,22 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Collection;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.tez.HashableInputSplit;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.mr.mapreduce.IcebergSplit;
 import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+import org.apache.iceberg.relocated.com.google.common.primitives.Longs;
 import org.apache.iceberg.util.SerializationUtil;
 
 // Hive requires file formats to return splits that are instances of `FileSplit`.
-public class HiveIcebergSplit extends FileSplit implements IcebergSplitContainer {
+public class HiveIcebergSplit extends FileSplit implements IcebergSplitContainer, HashableInputSplit {
 
   private IcebergSplit innerSplit;
 
@@ -69,6 +74,21 @@ public class HiveIcebergSplit extends FileSplit implements IcebergSplitContainer
   }
 
   @Override
+  public byte[] getBytesForHash() {
+    Collection<FileScanTask> fileScanTasks = innerSplit.task().files();
+
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      for (FileScanTask task : fileScanTasks) {
+        baos.write(task.file().path().toString().getBytes());
+        baos.write(Longs.toByteArray(task.start()));
+      }
+      return baos.toByteArray();
+    } catch (IOException ioe) {
+      throw new RuntimeException("Couldn't produce hash input bytes for HiveIcebergSplit: " + this, ioe);
+    }
+  }
+
+  @Override
   public long getStart() {
     return 0;
   }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index fdba3df..b2176d2 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -29,6 +29,9 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.BiFunction;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -97,31 +100,40 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
     return new InputFormatConfig.ConfigBuilder(job.getConfiguration());
   }
 
-  @Override
-  public List<InputSplit> getSplits(JobContext context) {
-    Configuration conf = context.getConfiguration();
-    Table table = Optional
-        .ofNullable(HiveIcebergStorageHandler.table(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER)))
-        .orElseGet(() -> Catalogs.loadTable(conf));
-
+  private static TableScan createTableScan(Table table, Configuration conf) {
     TableScan scan = table.newScan()
-            .caseSensitive(conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT));
+        .caseSensitive(conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT));
     long snapshotId = conf.getLong(InputFormatConfig.SNAPSHOT_ID, -1);
     if (snapshotId != -1) {
       scan = scan.useSnapshot(snapshotId);
     }
+
     long asOfTime = conf.getLong(InputFormatConfig.AS_OF_TIMESTAMP, -1);
     if (asOfTime != -1) {
       scan = scan.asOfTime(asOfTime);
     }
+
     long splitSize = conf.getLong(InputFormatConfig.SPLIT_SIZE, 0);
     if (splitSize > 0) {
       scan = scan.option(TableProperties.SPLIT_SIZE, String.valueOf(splitSize));
     }
+
+    // In case of LLAP-based execution we ask Iceberg not to combine multiple fileScanTasks into one split.
+    // This is so that cache affinity can work, and each file(split) is executed/cached on always the same LLAP daemon.
+    MapWork mapWork = LlapHiveUtils.findMapWork((JobConf) conf);
+    if (mapWork != null && mapWork.getCacheAffinity()) {
+      // Iceberg splits logically consist of buckets, where the bucket size equals to openFileCost setting if the files
+      // assigned to such bucket are smaller. This is how Iceberg would combine multiple files into one split, so here
+      // we need to enforce the bucket size to be equal to split size to avoid file combination.
+      Long openFileCost = splitSize > 0 ? splitSize : TableProperties.SPLIT_SIZE_DEFAULT;
+      scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, String.valueOf(openFileCost));
+    }
+
     String schemaStr = conf.get(InputFormatConfig.READ_SCHEMA);
     if (schemaStr != null) {
       scan.project(SchemaParser.fromJson(schemaStr));
     }
+
     String[] selectedColumns = conf.getStrings(InputFormatConfig.SELECTED_COLUMNS);
     if (selectedColumns != null) {
       scan.select(selectedColumns);
@@ -133,6 +145,18 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       scan = scan.filter(filter);
     }
 
+    return scan;
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) {
+    Configuration conf = context.getConfiguration();
+    Table table = Optional
+        .ofNullable(HiveIcebergStorageHandler.table(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER)))
+        .orElseGet(() -> Catalogs.loadTable(conf));
+
+    TableScan scan = createTableScan(table, conf);
+
     List<InputSplit> splits = Lists.newArrayList();
     boolean applyResidual = !conf.getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
     InputFormatConfig.InMemoryDataModel model = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
index bdc1014..bf2f29c 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
@@ -29,6 +29,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TaskAttemptID;
@@ -52,6 +56,7 @@ import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
 import org.apache.iceberg.mr.mapreduce.IcebergInputFormat;
@@ -70,6 +75,8 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(Parameterized.class)
 public class TestIcebergInputFormats {
@@ -371,6 +378,29 @@ public class TestIcebergInputFormats {
     testInputFormat.create(builder.conf()).validate(expectedRecords);
   }
 
+  @Test
+  public void testDeriveLlapSetsCacheAffinityForIcebergInputFormat() {
+    MapWork mapWork = new MapWork();
+    PartitionDesc partitionDesc = new PartitionDesc();
+    partitionDesc.setInputFileFormatClass(HiveIcebergInputFormat.class);
+    mapWork.addPathToPartitionInfo(new Path("/tmp"), partitionDesc);
+    Configuration job = new Configuration(false);
+    HiveConf.setVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, "true");
+    HiveConf.setBoolVar(job, HiveConf.ConfVars.LLAP_IO_NONVECTOR_WRAPPER_ENABLED, true);
+
+    mapWork.setVectorMode(true);
+    mapWork.deriveLlap(job, false);
+
+    assertTrue("Cache affinity should be set for HiveIcebergInputFormat when LLAP and vectorization is enabled",
+        mapWork.getCacheAffinity());
+
+    mapWork.setVectorMode(false);
+    mapWork.deriveLlap(job, false);
+
+    assertFalse("Cache affinity should be disabled for HiveIcebergInputFormat when LLAP is on, but vectorization not",
+        mapWork.getCacheAffinity());
+  }
+
   // TODO - Capture template type T in toString method: https://github.com/apache/iceberg/issues/1542
   public abstract static class TestInputFormat<T> {
 
diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read.q.out
index d4c64f6..cff315c 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read.q.out
@@ -68,7 +68,7 @@ STAGE PLANS:
                       Statistics: Num rows: 5 Data size: 460 Basic stats: COMPLETE Column stats: COMPLETE
                       value expressions: _col1 (type: int)
             Execution mode: vectorized, llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs (cache only)
         Reducer 2 
             Execution mode: vectorized, llap
             Reduce Operator Tree:
@@ -185,7 +185,7 @@ STAGE PLANS:
                       Statistics: Num rows: 1 Data size: 372 Basic stats: COMPLETE Column stats: COMPLETE
                       value expressions: _col9 (type: float)
             Execution mode: vectorized, llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs (cache only)
         Reducer 2 
             Execution mode: vectorized, llap
             Reduce Operator Tree:
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashableInputSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashableInputSplit.java
new file mode 100644
index 0000000..8b2754c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashableInputSplit.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+/**
+ * An InputSplit type that has a custom/specific way of producing its serialized content, to be later used as input
+ * of a hash function. Used for LLAP cache affinity, so that a certain split always ends up on the same executor.
+ */
+public interface HashableInputSplit {
+
+  byte[] getBytesForHash();
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
index 6be7226..b75c4da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.hash.Hashing;
 
-import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.split.SplitLocationProvider;
@@ -62,21 +61,17 @@ public class HostAffinitySplitLocationProvider implements SplitLocationProvider
       return split.getLocations();
     }
     FileSplit fsplit = (FileSplit) split;
-    String splitDesc = "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart()
-        + ", length=" + fsplit.getLength();
-    String location = locations.get(determineLocation(
-        locations, fsplit.getPath().toString(), fsplit.getStart(), splitDesc));
+    String location = locations.get(determineLocation(locations, fsplit));
     return (location != null) ? new String[] { location } : null;
   }
 
   @VisibleForTesting
-  public static int determineLocation(
-      List<String> locations, String path, long start, String desc) {
-    byte[] bytes = getHashInputForSplit(path, start);
+  public static int determineLocation(List<String> locations, FileSplit fsplit) {
+    byte[] bytes = getHashInputForSplit(fsplit);
     long hash1 = hash1(bytes);
     int index = Hashing.consistentHash(hash1, locations.size());
     String location = locations.get(index);
-    LOG.debug("{} mapped to index={}, location={}", desc, index, location);
+    LOG.debug("{} mapped to index={}, location={}", getSplitDescForDebug(fsplit), index, location);
     int iter = 1;
     long hash2 = 0;
     // Since our probing method is totally bogus, give up after some time.
@@ -87,23 +82,18 @@ public class HostAffinitySplitLocationProvider implements SplitLocationProvider
       // Note that this is not real double hashing since we have consistent hash on top.
       index = Hashing.consistentHash(hash1 + iter * hash2, locations.size());
       location = locations.get(index);
-      LOG.debug("{} remapped to index={}, location={}", desc, index, location);
+      LOG.debug("{} remapped to index={}, location={}", getSplitDescForDebug(fsplit), index, location);
       ++iter;
     }
     return index;
   }
 
-  private static byte[] getHashInputForSplit(String path, long start) {
-    // Explicitly using only the start offset of a split, and not the length. Splits generated on
-    // block boundaries and stripe boundaries can vary slightly. Try hashing both to the same node.
-    // There is the drawback of potentially hashing the same data on multiple nodes though, when a
-    // large split is sent to 1 node, and a second invocation uses smaller chunks of the previous
-    // large split and send them to different nodes.
-    byte[] pathBytes = path.getBytes();
-    byte[] allBytes = new byte[pathBytes.length + 8];
-    System.arraycopy(pathBytes, 0, allBytes, 0, pathBytes.length);
-    SerDeUtils.writeLong(allBytes, pathBytes.length, start >> 3);
-    return allBytes;
+  private static byte[] getHashInputForSplit(FileSplit fsplit) {
+    if (fsplit instanceof HashableInputSplit) {
+      return ((HashableInputSplit)fsplit).getBytesForHash();
+    } else {
+      throw new RuntimeException("Split is not a HashableInputSplit: " + fsplit);
+    }
   }
 
   private static long hash1(byte[] bytes) {
@@ -115,4 +105,12 @@ public class HostAffinitySplitLocationProvider implements SplitLocationProvider
     final int PRIME = 1366661;
     return Murmur3.hash64(bytes, 0, bytes.length, PRIME);
   }
+
+  private static String getSplitDescForDebug(FileSplit fsplit) {
+    if (LOG.isDebugEnabled()) {
+      return "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart() + ", length=" + fsplit.getLength();
+    } else {
+      return null;
+    }
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
index b2d1e0d..3926618 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
@@ -68,7 +68,7 @@ public class BucketizedHiveInputFormat<K extends WritableComparable, V extends W
       throw new IOException("cannot find class " + inputFormatClassName);
     }
 
-    pushProjectionsAndFiltersAndAsOf(job, inputFormatClass, hsplit.getPath());
+    pushProjectionsAndFiltersAndAsOf(job, hsplit.getPath());
 
     InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index a88424f..5c44960 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -710,7 +710,7 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
       throw new IOException("cannot find class " + inputFormatClassName);
     }
 
-    pushProjectionsAndFiltersAndAsOf(job, inputFormatClass, hsplit.getPath(0));
+    pushProjectionsAndFiltersAndAsOf(job, hsplit.getPath(0));
 
     return ShimLoader.getHadoopShims().getCombineFileInputFormat()
         .getRecordReader(job,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index d7491eb..e0787f5 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.HashableInputSplit;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -55,6 +56,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
@@ -141,7 +143,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
    * "map.input.file" in MapTask.
    */
   public static class HiveInputSplit extends FileSplit implements InputSplit,
-      Configurable {
+      Configurable, HashableInputSplit {
 
 
     InputSplit inputSplit;
@@ -238,6 +240,24 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     public void setConf(Configuration conf) {
       this.conf = conf;
     }
+
+    @Override
+    public byte[] getBytesForHash() {
+      if (inputSplit instanceof HashableInputSplit) {
+        return ((HashableInputSplit)inputSplit).getBytesForHash();
+      } else {
+        // Explicitly using only the start offset of a split, and not the length. Splits generated on
+        // block boundaries and stripe boundaries can vary slightly. Try hashing both to the same node.
+        // There is the drawback of potentially hashing the same data on multiple nodes though, when a
+        // large split is sent to 1 node, and a second invocation uses smaller chunks of the previous
+        // large split and send them to different nodes.
+        byte[] pathBytes = getPath().toString().getBytes();
+        byte[] allBytes = new byte[pathBytes.length + 8];
+        System.arraycopy(pathBytes, 0, allBytes, 0, pathBytes.length);
+        SerDeUtils.writeLong(allBytes, pathBytes.length, getStart() >> 3);
+        return allBytes;
+      }
+    }
   }
 
   @Override
@@ -337,7 +357,10 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
         (!checkVector || BatchToRowInputFormat.class.isAssignableFrom(clazz));
   }
 
-  public static boolean canInjectCaches(Class<? extends InputFormat> clazz) {
+  public static boolean canInjectCaches(Class<? extends InputFormat> clazz, boolean isVectorized) {
+    if (LlapCacheOnlyInputFormatInterface.VectorizedOnly.class.isAssignableFrom(clazz)) {
+      return isVectorized;
+    }
     return LlapCacheOnlyInputFormatInterface.class.isAssignableFrom(clazz);
   }
 
@@ -407,7 +430,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     }
 
     Path splitPath = hsplit.getPath();
-    pushProjectionsAndFiltersAndAsOf(job, inputFormatClass, splitPath, nonNative);
+    pushProjectionsAndFiltersAndAsOf(job, splitPath);
 
     InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
     if (HiveConf.getBoolVar(job, ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon())) {
@@ -953,13 +976,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     }
   }
 
-  protected void pushProjectionsAndFiltersAndAsOf(JobConf jobConf, Class inputFormatClass,
-      Path splitPath) {
-    pushProjectionsAndFiltersAndAsOf(jobConf, inputFormatClass, splitPath, false);
-  }
-
-  protected void pushProjectionsAndFiltersAndAsOf(JobConf jobConf, Class inputFormatClass,
-      Path splitPath, boolean nonNative) {
+  protected void pushProjectionsAndFiltersAndAsOf(JobConf jobConf, Path splitPath) {
     Path splitPathWithNoSchema = Path.getPathWithoutSchemeAndAuthority(splitPath);
     if (this.mrwork == null) {
       init(job);
@@ -977,32 +994,23 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     while (iterator.hasNext()) {
       Entry<Path, List<String>> entry = iterator.next();
       Path key = entry.getKey();
+      // Note for HIVE-1903: for non-native tables we might only see a table location provided as path in splitPath.
+      // In this case the code part below should still work, as the "key" will be an exact match for splitPath.
+      // Also: we should not anticipate table paths to be under other tables' locations.
       boolean match;
-      if (nonNative) {
-        // For non-native tables, we need to do an exact match to avoid
-        // HIVE-1903.  (The table location contains no files, and the string
-        // representation of its path does not have a trailing slash.)
-        match =
-          splitPath.equals(key) || splitPathWithNoSchema.equals(key);
-      } else {
-        // But for native tables, we need to do a prefix match for
-        // subdirectories.  (Unlike non-native tables, prefix mixups don't seem
-        // to be a potential problem here since we are always dealing with the
-        // path to something deeper than the table location.)
-        if (pathsSize > 1) {
-          // Comparing paths multiple times creates lots of objects &
-          // creates GC pressure for tables having large number of partitions.
-          // In such cases, use pre-computed paths for comparison
-          if (splitParentPaths == null) {
-            splitParentPaths = new HashSet<>();
-            FileUtils.populateParentPaths(splitParentPaths, splitPath);
-            FileUtils.populateParentPaths(splitParentPaths, splitPathWithNoSchema);
-          }
-          match = splitParentPaths.contains(key);
-        } else {
-          match = FileUtils.isPathWithinSubtree(splitPath, key)
-              || FileUtils.isPathWithinSubtree(splitPathWithNoSchema, key);
+      if (pathsSize > 1) {
+        // Comparing paths multiple times creates lots of objects &
+        // creates GC pressure for tables having large number of partitions.
+        // In such cases, use pre-computed paths for comparison
+        if (splitParentPaths == null) {
+          splitParentPaths = new HashSet<>();
+          FileUtils.populateParentPaths(splitParentPaths, splitPath);
+          FileUtils.populateParentPaths(splitParentPaths, splitPathWithNoSchema);
         }
+        match = splitParentPaths.contains(key);
+      } else {
+        match = FileUtils.isPathWithinSubtree(splitPath, key)
+            || FileUtils.isPathWithinSubtree(splitPathWithNoSchema, key);
       }
       if (match) {
         List<String> list = entry.getValue();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java
index 84a4f06..18ab79f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java
@@ -25,4 +25,11 @@ import org.apache.hadoop.hive.common.io.FileMetadataCache;
 /** Marker interface for LLAP IO. */
 public interface LlapCacheOnlyInputFormatInterface {
   void injectCaches(FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf);
+
+  /**
+   * For inputformats that can only accept LLAP caching with vectorization turned on.
+   */
+  interface VectorizedOnly extends LlapCacheOnlyInputFormatInterface {
+
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index dbbd8c9..17e1053 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -311,7 +311,8 @@ public class MapWork extends BaseWork {
           } else {
             hasLlap = true;
           }
-        } else if (isLlapOn && HiveInputFormat.canInjectCaches(inputFormatClass)) {
+        } else if (isLlapOn && HiveInputFormat.canInjectCaches(inputFormatClass,
+            Utilities.getIsVectorized(conf, this))) {
           hasCacheOnly = true;
         } else {
           hasNonLlap = true;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
index 61c98b7..e40a0a6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
@@ -215,8 +216,7 @@ public class TestHostAffinitySplitLocationProvider {
     int[] hitCounts = new int[locs];
     for (int splitIx = 0; splitIx < splits.length; ++splitIx) {
       state.set(0);
-      int index = HostAffinitySplitLocationProvider.determineLocation(partLocs,
-          splits[splitIx].getPath().toString(), splits[splitIx].getStart(), null);
+      int index = HostAffinitySplitLocationProvider.determineLocation(partLocs, splits[splitIx]);
       ++hitCounts[index];
     }
     SummaryStatistics ss = new SummaryStatistics();
@@ -320,8 +320,7 @@ public class TestHostAffinitySplitLocationProvider {
     doReturn(new Path(fakePathString)).when(fileSplit).getPath();
     doReturn(locations).when(fileSplit).getLocations();
 
-    doReturn(locations).when(fileSplit).getLocations();
-    return fileSplit;
+    return new HiveInputFormat.HiveInputSplit(fileSplit, "unused");
   }