You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2016/01/25 00:37:00 UTC

[8/8] hive git commit: HIVE-12470. Allow splits to provide custom consistent locations, instead of being tied to data locality. (Siddharth Seth, reviewed by Prasanth Jayachandran) (cherry picked from commit c89b4b12e4d8fc03e64493e6c821b3bffee6f236)

HIVE-12470. Allow splits to provide custom consistent locations, instead of being tied to data locality. (Siddharth Seth, reviewed by Prasanth Jayachandran)
(cherry picked from commit c89b4b12e4d8fc03e64493e6c821b3bffee6f236)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5c071544
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5c071544
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5c071544

Branch: refs/heads/branch-2.0
Commit: 5c071544deead530452ed1c044bc86878802a296
Parents: 4f76d46
Author: Siddharth Seth <ss...@apache.org>
Authored: Sun Jan 24 15:25:54 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sun Jan 24 15:30:24 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   7 +-
 .../hive/llap/registry/ServiceInstanceSet.java  |   8 +
 .../registry/impl/LlapFixedRegistryImpl.java    |  21 ++-
 .../llap/registry/impl/LlapRegistryService.java |  38 ++++-
 .../registry/impl/LlapYarnRegistryImpl.java     |  75 +++++++--
 .../hive/ql/exec/tez/CustomPartitionVertex.java |   9 +-
 .../hive/ql/exec/tez/HiveSplitGenerator.java    |  10 +-
 .../tez/HostAffinitySplitLocationProvider.java  |  86 ++++++++++
 .../hadoop/hive/ql/exec/tez/SplitGrouper.java   |  24 +--
 .../apache/hadoop/hive/ql/exec/tez/Utils.java   |  58 +++++++
 .../TestHostAffinitySplitLocationProvider.java  | 163 +++++++++++++++++++
 11 files changed, 464 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 26ba4f0..14b86e3 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2270,7 +2270,6 @@ public class HiveConf extends Configuration {
         "Whether to generate the splits locally or in the AM (tez only)"),
     HIVE_TEZ_GENERATE_CONSISTENT_SPLITS("hive.tez.input.generate.consistent.splits", true,
         "Whether to generate consistent split locations when generating splits in the AM"),
-
     HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container prewarm for Tez/Spark (Hadoop 2 only)"),
     HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10, "Controls the number of containers to prewarm for Tez/Spark (Hadoop 2 only)"),
 
@@ -2483,7 +2482,7 @@ public class HiveConf extends Configuration {
        new TimeValidator(TimeUnit.SECONDS),
       "How long to delay before cleaning up query files in LLAP (in seconds, for debugging).",
       "llap.file.cleanup.delay-seconds"),
-    LLAP_DAEMON_SERVICE_HOSTS("hive.llap.daemon.service.hosts", "",
+    LLAP_DAEMON_SERVICE_HOSTS("hive.llap.daemon.service.hosts", null,
       "Explicitly specified hosts to use for LLAP scheduling. Useful for testing. By default,\n" +
       "YARN registry is used.", "llap.daemon.service.hosts"),
     LLAP_DAEMON_SERVICE_REFRESH_INTERVAL("hive.llap.daemon.service.refresh.interval.sec", "60s",
@@ -2550,6 +2549,10 @@ public class HiveConf extends Configuration {
       "llap.daemon.service.port"),
     LLAP_DAEMON_WEB_SSL("hive.llap.daemon.web.ssl", false,
       "Whether LLAP daemon web UI should use SSL.", "llap.daemon.service.ssl"),
+    LLAP_CLIENT_CONSISTENT_SPLITS("hive.llap.client.consistent.splits",
+        false,
+        "Whether to setup split locations to match nodes on which llap daemons are running," +
+            " instead of using the locations provided by the split itself"),
 
     SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
       "60s", new TimeValidator(TimeUnit.SECONDS),

http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
index 388b5f3..be811eb 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
@@ -14,6 +14,7 @@
 package org.apache.hadoop.hive.llap.registry;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -30,6 +31,13 @@ public interface ServiceInstanceSet {
   public Map<String, ServiceInstance> getAll();
 
   /**
+   * Gets a list containing all the instances. This list has the same iteration order across
+   * different processes, assuming the list of registry entries is the same.
+   * @return
+   */
+  public List<ServiceInstance> getAllInstancesOrdered();
+
+  /**
    * Get an instance by worker identity.
    * 
    * @param name

http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index ef9de32..92044bb 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -17,8 +17,13 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -176,7 +181,8 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
 
   private final class FixedServiceInstanceSet implements ServiceInstanceSet {
 
-    private final Map<String, ServiceInstance> instances = new HashMap<String, ServiceInstance>();
+    // LinkedHashMap have a repeatable iteration order.
+    private final Map<String, ServiceInstance> instances = new LinkedHashMap<>();
 
     public FixedServiceInstanceSet() {
       for (String host : hosts) {
@@ -191,6 +197,19 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
     }
 
     @Override
+    public List<ServiceInstance> getAllInstancesOrdered() {
+      List<ServiceInstance> list = new LinkedList<>();
+      list.addAll(instances.values());
+      Collections.sort(list, new Comparator<ServiceInstance>() {
+        @Override
+        public int compare(ServiceInstance o1, ServiceInstance o2) {
+          return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity());
+        }
+      });
+      return list;
+    }
+
+    @Override
     public ServiceInstance getInstance(String name) {
       return instances.get(name);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index 740f373..907faed 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -14,11 +14,13 @@
 package org.apache.hadoop.hive.llap.registry.impl;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
 import org.apache.hadoop.service.AbstractService;
@@ -32,11 +34,45 @@ public class LlapRegistryService extends AbstractService {
   private ServiceRegistry registry = null;
   private final boolean isDaemon;
 
+  private static final Map<String, LlapRegistryService> yarnRegistries = new HashMap<>();
+
   public LlapRegistryService(boolean isDaemon) {
     super("LlapRegistryService");
     this.isDaemon = isDaemon;
   }
 
+  /**
+   * Helper method to get a ServiceRegistry instance to read from the registry.
+   * This should not be used by LLAP daemons.
+   *
+   * @param conf {@link Configuration} instance which contains service registry information.
+   * @return
+   */
+  public static synchronized LlapRegistryService getClient(Configuration conf) {
+    String hosts = HiveConf.getTrimmedVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+    Preconditions.checkNotNull(hosts, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.toString() + " must be defined");
+    LlapRegistryService registry;
+    if (hosts.startsWith("@")) {
+      // Caching instances only in case of the YARN registry. Each host based list will get it's own copy.
+      String name = hosts.substring(1);
+      if (yarnRegistries.containsKey(name)) {
+        registry = yarnRegistries.get(name);
+      } else {
+        registry = new LlapRegistryService(false);
+        registry.init(conf);
+        registry.start();
+        yarnRegistries.put(name, registry);
+      }
+    } else {
+      registry = new LlapRegistryService(false);
+      registry.init(conf);
+      registry.start();
+    }
+    LOG.info("Using LLAP registry (client) type: " + registry);
+    return registry;
+  }
+
+
   @Override
   public void serviceInit(Configuration conf) {
     String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);

http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
index fc2ebf2..efe31cc 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
@@ -20,15 +20,20 @@ import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
@@ -269,16 +274,47 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
 
     // LinkedHashMap to retain iteration order.
     private final Map<String, ServiceInstance> instances = new LinkedHashMap<>();
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+    private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
 
     @Override
-    public synchronized Map<String, ServiceInstance> getAll() {
+    public Map<String, ServiceInstance> getAll() {
       // Return a copy. Instances may be modified during a refresh.
-      return new LinkedHashMap<>(instances);
+      readLock.lock();
+      try {
+        return new LinkedHashMap<>(instances);
+      } finally {
+        readLock.unlock();
+      }
     }
 
     @Override
-    public synchronized ServiceInstance getInstance(String name) {
-      return instances.get(name);
+    public List<ServiceInstance> getAllInstancesOrdered() {
+      List<ServiceInstance> list = new LinkedList<>();
+      readLock.lock();
+      try {
+        list.addAll(instances.values());
+      } finally {
+        readLock.unlock();
+      }
+      Collections.sort(list, new Comparator<ServiceInstance>() {
+        @Override
+        public int compare(ServiceInstance o1, ServiceInstance o2) {
+          return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity());
+        }
+      });
+      return list;
+    }
+
+    @Override
+    public ServiceInstance getInstance(String name) {
+      readLock.lock();
+      try {
+        return instances.get(name);
+      } finally {
+        readLock.unlock();
+      }
     }
 
     @Override
@@ -290,7 +326,8 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
       Map<String, ServiceRecord> records =
           RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path));
       // Synchronize after reading the service records from the external service (ZK)
-      synchronized (this) {
+      writeLock.lock();
+      try {
         Set<String> latestKeys = new HashSet<String>();
         LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this));
         for (ServiceRecord rec : records.values()) {
@@ -333,28 +370,34 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
         } else {
           this.instances.putAll(freshInstances);
         }
+      } finally {
+        writeLock.unlock();
       }
     }
 
     @Override
-    public synchronized Set<ServiceInstance> getByHost(String host) {
+    public Set<ServiceInstance> getByHost(String host) {
       // TODO Maybe store this as a map which is populated during construction, to avoid walking
       // the map on each request.
+      readLock.lock();
       Set<ServiceInstance> byHost = new HashSet<ServiceInstance>();
-
-      for (ServiceInstance i : instances.values()) {
-        if (host.equals(i.getHost())) {
-          // all hosts in instances should be alive in this impl
-          byHost.add(i);
+      try {
+        for (ServiceInstance i : instances.values()) {
+          if (host.equals(i.getHost())) {
+            // all hosts in instances should be alive in this impl
+            byHost.add(i);
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Locality comparing " + host + " to " + i.getHost());
+          }
         }
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Locality comparing " + host + " to " + i.getHost());
+          LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
         }
+        return byHost;
+      } finally {
+        readLock.unlock();
       }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
-      }
-      return byHost;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
index e9c14b1..45d3cd1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
@@ -33,6 +33,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 
 import com.google.common.collect.LinkedListMultimap;
+import org.apache.hadoop.mapred.split.SplitLocationProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -271,25 +272,27 @@ public class CustomPartitionVertex extends VertexManagerPlugin {
           HashMultimap.<Integer, InputSplit> create();
       boolean secondLevelGroupingDone = false;
       if ((mainWorkName.isEmpty()) || (inputName.compareTo(mainWorkName) == 0)) {
+        SplitLocationProvider splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG);
         for (Integer key : bucketToInitialSplitMap.keySet()) {
           InputSplit[] inputSplitArray =
               (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
           Multimap<Integer, InputSplit> groupedSplit =
               grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
-                  availableSlots, inputName, mainWorkName.isEmpty());
+                  availableSlots, inputName, mainWorkName.isEmpty(), splitLocationProvider);
           if (mainWorkName.isEmpty() == false) {
             Multimap<Integer, InputSplit> singleBucketToGroupedSplit =
                 HashMultimap.<Integer, InputSplit> create();
             singleBucketToGroupedSplit.putAll(key, groupedSplit.values());
             groupedSplit =
                 grouper.group(jobConf, singleBucketToGroupedSplit, availableSlots,
-                    HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES));
+                    HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES), null);
             secondLevelGroupingDone = true;
           }
           bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
         }
         processAllEvents(inputName, bucketToGroupedSplitMap, secondLevelGroupingDone);
       } else {
+        SplitLocationProvider splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG);
         // do not group across files in case of side work because there is only 1 KV reader per
         // grouped split. This would affect SMB joins where we want to find the smallest key in
         // all the bucket files.
@@ -298,7 +301,7 @@ public class CustomPartitionVertex extends VertexManagerPlugin {
               (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
           Multimap<Integer, InputSplit> groupedSplit =
               grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
-                    availableSlots, inputName, false);
+                    availableSlots, inputName, false, splitLocationProvider);
             bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
         }
         /*

http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 8ebfe69..8e48c2e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.split.SplitLocationProvider;
 import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
@@ -79,6 +80,7 @@ public class HiveSplitGenerator extends InputInitializer {
   private final MRInputUserPayloadProto userPayloadProto;
   private final MapWork work;
   private final SplitGrouper splitGrouper = new SplitGrouper();
+  private final SplitLocationProvider splitLocationProvider;
 
   public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException,
       SerDeException {
@@ -91,6 +93,9 @@ public class HiveSplitGenerator extends InputInitializer {
 
     this.jobConf = new JobConf(conf);
 
+    this.splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG);
+    LOG.info("SplitLocationProvider: " + splitLocationProvider);
+
     // Read all credentials into the credentials instance stored in JobConf.
     ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
 
@@ -149,6 +154,7 @@ public class HiveSplitGenerator extends InputInitializer {
             conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
                 TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
 
+        // Raw splits
         InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
         // Sort the splits, so that subsequent grouping is consistent.
         Arrays.sort(splits, new InputSplitComparator());
@@ -160,10 +166,10 @@ public class HiveSplitGenerator extends InputInitializer {
         }
 
         Multimap<Integer, InputSplit> groupedSplits =
-            splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots);
+            splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, splitLocationProvider);
         // And finally return them in a flat array
         InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]);
-        LOG.info("Number of grouped splits: " + flatSplits.length);
+        LOG.info("Number of split groups: " + flatSplits.length);
 
         List<TaskLocationHint> locationHints = splitGrouper.createTaskLocationHints(flatSplits, generateConsistentSplits);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
new file mode 100644
index 0000000..c06499e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.split.SplitLocationProvider;
+import org.apache.hive.common.util.Murmur3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This maps a split (path + offset) to an index based on the number of locations provided.
+ *
+ * If locations do not change across jobs, the intention is to map the same split to the same node.
+ *
+ * A big problem is when nodes change (added, removed, temporarily removed and re-added) etc. That changes
+ * the number of locations / position of locations - and will cause the cache to be almost completely invalidated.
+ *
+ * TODO: Support for consistent hashing when combining the split location generator and the ServiceRegistry.
+ *
+ */
+public class HostAffinitySplitLocationProvider implements SplitLocationProvider {
+
+  private final Logger LOG = LoggerFactory.getLogger(HostAffinitySplitLocationProvider.class);
+  private final boolean isDebugEnabled = LOG.isDebugEnabled();
+
+  private final String[] knownLocations;
+
+  public HostAffinitySplitLocationProvider(String[] knownLocations) {
+    Preconditions.checkState(knownLocations != null && knownLocations.length != 0,
+        HostAffinitySplitLocationProvider.class.getName() +
+            "needs at least 1 location to function");
+    this.knownLocations = knownLocations;
+  }
+
+  @Override
+  public String[] getLocations(InputSplit split) throws IOException {
+    if (split instanceof FileSplit) {
+      FileSplit fsplit = (FileSplit) split;
+      long hash = generateHash(fsplit.getPath().toString(), fsplit.getStart());
+      int indexRaw = (int) (hash % knownLocations.length);
+      int index = Math.abs(indexRaw);
+      if (isDebugEnabled) {
+        LOG.debug(
+            "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart() + ", length=" +
+                fsplit.getLength() + " mapped to index=" + index + ", location=" +
+                knownLocations[index]);
+      }
+      return new String[]{knownLocations[index]};
+    } else {
+      if (isDebugEnabled) {
+        LOG.debug("Split: " + split + " is not a FileSplit. Using default locations");
+      }
+      return split.getLocations();
+    }
+  }
+
+  private long generateHash(String path, long startOffset) throws IOException {
+    // Explicitly using only the start offset of a split, and not the length.
+    // Splits generated on block boundaries and stripe boundaries can vary slightly. Try hashing both to the same node.
+    // There is the drawback of potentially hashing the same data on multiple nodes though, when a large split
+    // is sent to 1 node, and a second invocation uses smaller chunks of the previous large split and send them
+    // to different nodes.
+    DataOutputBuffer dob = new DataOutputBuffer();
+    dob.writeLong(startOffset);
+    dob.writeUTF(path);
+    return Murmur3.hash64(dob.getData(), 0, dob.getLength());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
index aaaa6a5..f4496df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -42,6 +41,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.split.SplitLocationProvider;
 import org.apache.hadoop.mapred.split.TezGroupedSplit;
 import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper;
 import org.apache.tez.dag.api.TaskLocationHint;
@@ -65,14 +65,13 @@ public class SplitGrouper {
 
   private final TezMapredSplitsGrouper tezGrouper = new TezMapredSplitsGrouper();
 
-
-
   /**
    * group splits for each bucket separately - while evenly filling all the
    * available slots with tasks
    */
   public Multimap<Integer, InputSplit> group(Configuration conf,
-      Multimap<Integer, InputSplit> bucketSplitMultimap, int availableSlots, float waves)
+      Multimap<Integer, InputSplit> bucketSplitMultimap, int availableSlots, float waves,
+                                             SplitLocationProvider splitLocationProvider)
       throws IOException {
 
     // figure out how many tasks we want for each bucket
@@ -90,9 +89,9 @@ public class SplitGrouper {
       InputSplit[] rawSplits = inputSplitCollection.toArray(new InputSplit[0]);
       InputSplit[] groupedSplits =
           tezGrouper.getGroupedSplits(conf, rawSplits, bucketTaskMap.get(bucketId),
-              HiveInputFormat.class.getName(), new ColumnarSplitSizeEstimator());
+              HiveInputFormat.class.getName(), new ColumnarSplitSizeEstimator(), splitLocationProvider);
 
-      LOG.info("Original split size is " + rawSplits.length + " grouped split size is "
+      LOG.info("Original split count is " + rawSplits.length + " grouped split count is "
           + groupedSplits.length + ", for bucket: " + bucketId);
 
       for (InputSplit inSplit : groupedSplits) {
@@ -155,9 +154,10 @@ public class SplitGrouper {
   public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
                                                                     Configuration conf,
                                                                     InputSplit[] splits,
-                                                                    float waves, int availableSlots)
+                                                                    float waves, int availableSlots,
+                                                                    SplitLocationProvider locationProvider)
       throws Exception {
-    return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true);
+    return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true, locationProvider);
   }
 
   /** Generate groups of splits, separated by schema evolution boundaries */
@@ -166,10 +166,12 @@ public class SplitGrouper {
                                                                     InputSplit[] splits,
                                                                     float waves, int availableSlots,
                                                                     String inputName,
-                                                                    boolean groupAcrossFiles) throws
+                                                                    boolean groupAcrossFiles,
+                                                                    SplitLocationProvider locationProvider) throws
       Exception {
 
     MapWork work = populateMapWork(jobConf, inputName);
+    // ArrayListMultimap is important here to retain the ordering for the splits.
     Multimap<Integer, InputSplit> bucketSplitMultiMap =
         ArrayListMultimap.<Integer, InputSplit> create();
 
@@ -188,7 +190,7 @@ public class SplitGrouper {
 
     // group them into the chunks we want
     Multimap<Integer, InputSplit> groupedSplits =
-        this.group(jobConf, bucketSplitMultiMap, availableSlots, waves);
+        this.group(jobConf, bucketSplitMultiMap, availableSlots, waves, locationProvider);
 
     return groupedSplits;
   }
@@ -207,6 +209,8 @@ public class SplitGrouper {
     // mapping of bucket id to number of required tasks to run
     Map<Integer, Integer> bucketTaskMap = new HashMap<Integer, Integer>();
 
+    // TODO HIVE-12255. Make use of SplitSizeEstimator.
+    // The actual task computation needs to be looked at as well.
     // compute the total size per bucket
     long totalSize = 0;
     boolean earlyExit = false;

http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
new file mode 100644
index 0000000..3eb858b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.mapred.split.SplitLocationProvider;
+import org.slf4j.Logger;
+
+public class Utils {
+  public static SplitLocationProvider getSplitLocationProvider(Configuration conf, Logger LOG) throws
+      IOException {
+    boolean useCustomLocations =
+        HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS);
+    SplitLocationProvider splitLocationProvider;
+    LOG.info("SplitGenerator using llap affinitized locations: " + useCustomLocations);
+    if (useCustomLocations) {
+      LlapRegistryService serviceRegistry;
+      serviceRegistry = LlapRegistryService.getClient(conf);
+
+      List<ServiceInstance> serviceInstances =
+          serviceRegistry.getInstances().getAllInstancesOrdered();
+      String[] locations = new String[serviceInstances.size()];
+      int i = 0;
+      for (ServiceInstance serviceInstance : serviceInstances) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" +
+              serviceInstance.getHost() + " to list for split locations");
+        }
+        locations[i++] = serviceInstance.getHost();
+      }
+      splitLocationProvider = new HostAffinitySplitLocationProvider(locations);
+    } else {
+      splitLocationProvider = null;
+    }
+    return splitLocationProvider;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
new file mode 100644
index 0000000..d98a5ff
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.junit.Test;
+
+public class TestHostAffinitySplitLocationProvider {
+
+
+  private static final String[] locations = new String[5];
+  private static final Set<String> locationsSet = new HashSet<>();
+  private static final String[] executorLocations = new String[9];
+  private static final Set<String> executorLocationsSet = new HashSet<>();
+
+  static {
+    for (int i = 0 ; i < 5 ; i++) {
+      locations[i] = "location" + i;
+      locationsSet.add(locations[i]);
+    }
+
+    for (int i = 0 ; i < 9 ; i++) {
+      executorLocations[i] = "execLocation" + i;
+      executorLocationsSet.add(executorLocations[i]);
+    }
+
+  }
+
+  @Test (timeout = 5000)
+  public void testNonFileSplits() throws IOException {
+
+    HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations);
+
+    InputSplit inputSplit1 = createMockInputSplit(new String[] {locations[0], locations[1]});
+    InputSplit inputSplit2 = createMockInputSplit(new String[] {locations[2], locations[3]});
+
+    assertArrayEquals(new String[] {locations[0], locations[1]}, locationProvider.getLocations(inputSplit1));
+    assertArrayEquals(new String[] {locations[2], locations[3]}, locationProvider.getLocations(inputSplit2));
+  }
+
+  @Test (timeout = 5000)
+  public void testOrcSplitsBasic() throws IOException {
+    HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations);
+
+    InputSplit os1 = createMockFileSplit(true, "path1", 0, 1000, new String[] {locations[0], locations[1]});
+    InputSplit os2 = createMockFileSplit(true, "path2", 0, 2000, new String[] {locations[2], locations[3]});
+    InputSplit os3 = createMockFileSplit(true, "path3", 1000, 2000, new String[] {locations[0], locations[3]});
+
+    String[] retLoc1 = locationProvider.getLocations(os1);
+    String[] retLoc2 = locationProvider.getLocations(os2);
+    String[] retLoc3 = locationProvider.getLocations(os3);
+
+    assertEquals(1, retLoc1.length);
+    assertFalse(locationsSet.contains(retLoc1[0]));
+    assertTrue(executorLocationsSet.contains(retLoc1[0]));
+
+    assertEquals(1, retLoc2.length);
+    assertFalse(locationsSet.contains(retLoc2[0]));
+    assertTrue(executorLocationsSet.contains(retLoc2[0]));
+
+    assertEquals(1, retLoc3.length);
+    assertFalse(locationsSet.contains(retLoc3[0]));
+    assertTrue(executorLocationsSet.contains(retLoc3[0]));
+  }
+
+  @Test (timeout = 5000)
+  public void testOrcSplitsLocationAffinity() throws IOException {
+    HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations);
+
+    // Same file, offset, different lengths
+    InputSplit os11 = createMockFileSplit(true, "path1", 0, 15000, new String[] {locations[0], locations[1]});
+    InputSplit os12 = createMockFileSplit(true, "path1", 0, 30000, new String[] {locations[0], locations[1]});
+    // Same file, different offset
+    InputSplit os13 = createMockFileSplit(true, "path1", 15000, 30000, new String[] {locations[0], locations[1]});
+
+    String[] retLoc11 = locationProvider.getLocations(os11);
+    String[] retLoc12 = locationProvider.getLocations(os12);
+    String[] retLoc13 = locationProvider.getLocations(os13);
+
+    assertEquals(1, retLoc11.length);
+    assertFalse(locationsSet.contains(retLoc11[0]));
+    assertTrue(executorLocationsSet.contains(retLoc11[0]));
+
+    assertEquals(1, retLoc12.length);
+    assertFalse(locationsSet.contains(retLoc12[0]));
+    assertTrue(executorLocationsSet.contains(retLoc12[0]));
+
+    assertEquals(1, retLoc13.length);
+    assertFalse(locationsSet.contains(retLoc13[0]));
+    assertTrue(executorLocationsSet.contains(retLoc13[0]));
+
+    // Verify the actual locations being correct.
+    // os13 should be on a different location. Splits are supposed to be consistent across JVMs,
+    // the test is setup to verify a different host (make sure not to hash to the same host as os11,os12).
+    // If the test were to fail because the host is the same - the assumption about consistent across JVM
+    // instances is likely incorrect.
+    assertEquals(retLoc11[0], retLoc12[0]);
+    assertNotEquals(retLoc11[0], retLoc13[0]);
+
+
+    // Get locations again, and make sure they're the same.
+    String[] retLoc112 = locationProvider.getLocations(os11);
+    String[] retLoc122 = locationProvider.getLocations(os12);
+    String[] retLoc132 = locationProvider.getLocations(os13);
+    assertArrayEquals(retLoc11, retLoc112);
+    assertArrayEquals(retLoc12, retLoc122);
+    assertArrayEquals(retLoc13, retLoc132);
+  }
+
+
+  private InputSplit createMockInputSplit(String[] locations) throws IOException {
+    InputSplit inputSplit = mock(InputSplit.class);
+    doReturn(locations).when(inputSplit).getLocations();
+    return inputSplit;
+  }
+
+  private InputSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start,
+                                         long length, String[] locations) throws IOException {
+    FileSplit fileSplit;
+    if (createOrcSplit) {
+      fileSplit = mock(OrcSplit.class);
+    } else {
+      fileSplit = mock(FileSplit.class);
+    }
+
+    doReturn(start).when(fileSplit).getStart();
+    doReturn(length).when(fileSplit).getLength();
+    doReturn(new Path(fakePathString)).when(fileSplit).getPath();
+    doReturn(locations).when(fileSplit).getLocations();
+
+    doReturn(locations).when(fileSplit).getLocations();
+    return fileSplit;
+  }
+
+
+}