You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2016/01/25 00:37:00 UTC
[8/8] hive git commit: HIVE-12470. Allow splits to provide custom
consistent locations, instead of being tied to data locality. (Siddharth Seth,
reviewed by Prasanth Jayachandran) (cherry picked from commit
c89b4b12e4d8fc03e64493e6c821b3bffee6f236)
HIVE-12470. Allow splits to provide custom consistent locations, instead of being tied to data locality. (Siddharth Seth, reviewed by Prasanth Jayachandran)
(cherry picked from commit c89b4b12e4d8fc03e64493e6c821b3bffee6f236)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5c071544
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5c071544
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5c071544
Branch: refs/heads/branch-2.0
Commit: 5c071544deead530452ed1c044bc86878802a296
Parents: 4f76d46
Author: Siddharth Seth <ss...@apache.org>
Authored: Sun Jan 24 15:25:54 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sun Jan 24 15:30:24 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 7 +-
.../hive/llap/registry/ServiceInstanceSet.java | 8 +
.../registry/impl/LlapFixedRegistryImpl.java | 21 ++-
.../llap/registry/impl/LlapRegistryService.java | 38 ++++-
.../registry/impl/LlapYarnRegistryImpl.java | 75 +++++++--
.../hive/ql/exec/tez/CustomPartitionVertex.java | 9 +-
.../hive/ql/exec/tez/HiveSplitGenerator.java | 10 +-
.../tez/HostAffinitySplitLocationProvider.java | 86 ++++++++++
.../hadoop/hive/ql/exec/tez/SplitGrouper.java | 24 +--
.../apache/hadoop/hive/ql/exec/tez/Utils.java | 58 +++++++
.../TestHostAffinitySplitLocationProvider.java | 163 +++++++++++++++++++
11 files changed, 464 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 26ba4f0..14b86e3 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2270,7 +2270,6 @@ public class HiveConf extends Configuration {
"Whether to generate the splits locally or in the AM (tez only)"),
HIVE_TEZ_GENERATE_CONSISTENT_SPLITS("hive.tez.input.generate.consistent.splits", true,
"Whether to generate consistent split locations when generating splits in the AM"),
-
HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container prewarm for Tez/Spark (Hadoop 2 only)"),
HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10, "Controls the number of containers to prewarm for Tez/Spark (Hadoop 2 only)"),
@@ -2483,7 +2482,7 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.SECONDS),
"How long to delay before cleaning up query files in LLAP (in seconds, for debugging).",
"llap.file.cleanup.delay-seconds"),
- LLAP_DAEMON_SERVICE_HOSTS("hive.llap.daemon.service.hosts", "",
+ LLAP_DAEMON_SERVICE_HOSTS("hive.llap.daemon.service.hosts", null,
"Explicitly specified hosts to use for LLAP scheduling. Useful for testing. By default,\n" +
"YARN registry is used.", "llap.daemon.service.hosts"),
LLAP_DAEMON_SERVICE_REFRESH_INTERVAL("hive.llap.daemon.service.refresh.interval.sec", "60s",
@@ -2550,6 +2549,10 @@ public class HiveConf extends Configuration {
"llap.daemon.service.port"),
LLAP_DAEMON_WEB_SSL("hive.llap.daemon.web.ssl", false,
"Whether LLAP daemon web UI should use SSL.", "llap.daemon.service.ssl"),
+ LLAP_CLIENT_CONSISTENT_SPLITS("hive.llap.client.consistent.splits",
+ false,
+ "Whether to setup split locations to match nodes on which llap daemons are running," +
+ " instead of using the locations provided by the split itself"),
SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
"60s", new TimeValidator(TimeUnit.SECONDS),
http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
index 388b5f3..be811eb 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
@@ -14,6 +14,7 @@
package org.apache.hadoop.hive.llap.registry;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -30,6 +31,13 @@ public interface ServiceInstanceSet {
public Map<String, ServiceInstance> getAll();
/**
+ * Gets a list containing all the instances. This list has the same iteration order across
+ * different processes, assuming the list of registry entries is the same.
+ * @return
+ */
+ public List<ServiceInstance> getAllInstancesOrdered();
+
+ /**
* Get an instance by worker identity.
*
* @param name
http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index ef9de32..92044bb 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -17,8 +17,13 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -176,7 +181,8 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
private final class FixedServiceInstanceSet implements ServiceInstanceSet {
- private final Map<String, ServiceInstance> instances = new HashMap<String, ServiceInstance>();
+ // LinkedHashMap have a repeatable iteration order.
+ private final Map<String, ServiceInstance> instances = new LinkedHashMap<>();
public FixedServiceInstanceSet() {
for (String host : hosts) {
@@ -191,6 +197,19 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
}
@Override
+ public List<ServiceInstance> getAllInstancesOrdered() {
+ List<ServiceInstance> list = new LinkedList<>();
+ list.addAll(instances.values());
+ Collections.sort(list, new Comparator<ServiceInstance>() {
+ @Override
+ public int compare(ServiceInstance o1, ServiceInstance o2) {
+ return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity());
+ }
+ });
+ return list;
+ }
+
+ @Override
public ServiceInstance getInstance(String name) {
return instances.get(name);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index 740f373..907faed 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -14,11 +14,13 @@
package org.apache.hadoop.hive.llap.registry.impl;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
import org.apache.hadoop.service.AbstractService;
@@ -32,11 +34,45 @@ public class LlapRegistryService extends AbstractService {
private ServiceRegistry registry = null;
private final boolean isDaemon;
+ private static final Map<String, LlapRegistryService> yarnRegistries = new HashMap<>();
+
public LlapRegistryService(boolean isDaemon) {
super("LlapRegistryService");
this.isDaemon = isDaemon;
}
+ /**
+ * Helper method to get a ServiceRegistry instance to read from the registry.
+ * This should not be used by LLAP daemons.
+ *
+ * @param conf {@link Configuration} instance which contains service registry information.
+ * @return
+ */
+ public static synchronized LlapRegistryService getClient(Configuration conf) {
+ String hosts = HiveConf.getTrimmedVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+ Preconditions.checkNotNull(hosts, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.toString() + " must be defined");
+ LlapRegistryService registry;
+ if (hosts.startsWith("@")) {
+ // Caching instances only in case of the YARN registry. Each host based list will get it's own copy.
+ String name = hosts.substring(1);
+ if (yarnRegistries.containsKey(name)) {
+ registry = yarnRegistries.get(name);
+ } else {
+ registry = new LlapRegistryService(false);
+ registry.init(conf);
+ registry.start();
+ yarnRegistries.put(name, registry);
+ }
+ } else {
+ registry = new LlapRegistryService(false);
+ registry.init(conf);
+ registry.start();
+ }
+ LOG.info("Using LLAP registry (client) type: " + registry);
+ return registry;
+ }
+
+
@Override
public void serviceInit(Configuration conf) {
String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
index fc2ebf2..efe31cc 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
@@ -20,15 +20,20 @@ import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
@@ -269,16 +274,47 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
// LinkedHashMap to retain iteration order.
private final Map<String, ServiceInstance> instances = new LinkedHashMap<>();
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+ private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
@Override
- public synchronized Map<String, ServiceInstance> getAll() {
+ public Map<String, ServiceInstance> getAll() {
// Return a copy. Instances may be modified during a refresh.
- return new LinkedHashMap<>(instances);
+ readLock.lock();
+ try {
+ return new LinkedHashMap<>(instances);
+ } finally {
+ readLock.unlock();
+ }
}
@Override
- public synchronized ServiceInstance getInstance(String name) {
- return instances.get(name);
+ public List<ServiceInstance> getAllInstancesOrdered() {
+ List<ServiceInstance> list = new LinkedList<>();
+ readLock.lock();
+ try {
+ list.addAll(instances.values());
+ } finally {
+ readLock.unlock();
+ }
+ Collections.sort(list, new Comparator<ServiceInstance>() {
+ @Override
+ public int compare(ServiceInstance o1, ServiceInstance o2) {
+ return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity());
+ }
+ });
+ return list;
+ }
+
+ @Override
+ public ServiceInstance getInstance(String name) {
+ readLock.lock();
+ try {
+ return instances.get(name);
+ } finally {
+ readLock.unlock();
+ }
}
@Override
@@ -290,7 +326,8 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
Map<String, ServiceRecord> records =
RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path));
// Synchronize after reading the service records from the external service (ZK)
- synchronized (this) {
+ writeLock.lock();
+ try {
Set<String> latestKeys = new HashSet<String>();
LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this));
for (ServiceRecord rec : records.values()) {
@@ -333,28 +370,34 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
} else {
this.instances.putAll(freshInstances);
}
+ } finally {
+ writeLock.unlock();
}
}
@Override
- public synchronized Set<ServiceInstance> getByHost(String host) {
+ public Set<ServiceInstance> getByHost(String host) {
// TODO Maybe store this as a map which is populated during construction, to avoid walking
// the map on each request.
+ readLock.lock();
Set<ServiceInstance> byHost = new HashSet<ServiceInstance>();
-
- for (ServiceInstance i : instances.values()) {
- if (host.equals(i.getHost())) {
- // all hosts in instances should be alive in this impl
- byHost.add(i);
+ try {
+ for (ServiceInstance i : instances.values()) {
+ if (host.equals(i.getHost())) {
+ // all hosts in instances should be alive in this impl
+ byHost.add(i);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Locality comparing " + host + " to " + i.getHost());
+ }
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Locality comparing " + host + " to " + i.getHost());
+ LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
}
+ return byHost;
+ } finally {
+ readLock.unlock();
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
- }
- return byHost;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
index e9c14b1..45d3cd1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
@@ -33,6 +33,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import com.google.common.collect.LinkedListMultimap;
+import org.apache.hadoop.mapred.split.SplitLocationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -271,25 +272,27 @@ public class CustomPartitionVertex extends VertexManagerPlugin {
HashMultimap.<Integer, InputSplit> create();
boolean secondLevelGroupingDone = false;
if ((mainWorkName.isEmpty()) || (inputName.compareTo(mainWorkName) == 0)) {
+ SplitLocationProvider splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG);
for (Integer key : bucketToInitialSplitMap.keySet()) {
InputSplit[] inputSplitArray =
(bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
Multimap<Integer, InputSplit> groupedSplit =
grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
- availableSlots, inputName, mainWorkName.isEmpty());
+ availableSlots, inputName, mainWorkName.isEmpty(), splitLocationProvider);
if (mainWorkName.isEmpty() == false) {
Multimap<Integer, InputSplit> singleBucketToGroupedSplit =
HashMultimap.<Integer, InputSplit> create();
singleBucketToGroupedSplit.putAll(key, groupedSplit.values());
groupedSplit =
grouper.group(jobConf, singleBucketToGroupedSplit, availableSlots,
- HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES));
+ HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES), null);
secondLevelGroupingDone = true;
}
bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
}
processAllEvents(inputName, bucketToGroupedSplitMap, secondLevelGroupingDone);
} else {
+ SplitLocationProvider splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG);
// do not group across files in case of side work because there is only 1 KV reader per
// grouped split. This would affect SMB joins where we want to find the smallest key in
// all the bucket files.
@@ -298,7 +301,7 @@ public class CustomPartitionVertex extends VertexManagerPlugin {
(bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
Multimap<Integer, InputSplit> groupedSplit =
grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
- availableSlots, inputName, false);
+ availableSlots, inputName, false, splitLocationProvider);
bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
}
/*
http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 8ebfe69..8e48c2e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.split.SplitLocationProvider;
import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezUtils;
@@ -79,6 +80,7 @@ public class HiveSplitGenerator extends InputInitializer {
private final MRInputUserPayloadProto userPayloadProto;
private final MapWork work;
private final SplitGrouper splitGrouper = new SplitGrouper();
+ private final SplitLocationProvider splitLocationProvider;
public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException,
SerDeException {
@@ -91,6 +93,9 @@ public class HiveSplitGenerator extends InputInitializer {
this.jobConf = new JobConf(conf);
+ this.splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG);
+ LOG.info("SplitLocationProvider: " + splitLocationProvider);
+
// Read all credentials into the credentials instance stored in JobConf.
ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
@@ -149,6 +154,7 @@ public class HiveSplitGenerator extends InputInitializer {
conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
+ // Raw splits
InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
// Sort the splits, so that subsequent grouping is consistent.
Arrays.sort(splits, new InputSplitComparator());
@@ -160,10 +166,10 @@ public class HiveSplitGenerator extends InputInitializer {
}
Multimap<Integer, InputSplit> groupedSplits =
- splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots);
+ splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, splitLocationProvider);
// And finally return them in a flat array
InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]);
- LOG.info("Number of grouped splits: " + flatSplits.length);
+ LOG.info("Number of split groups: " + flatSplits.length);
List<TaskLocationHint> locationHints = splitGrouper.createTaskLocationHints(flatSplits, generateConsistentSplits);
http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
new file mode 100644
index 0000000..c06499e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.split.SplitLocationProvider;
+import org.apache.hive.common.util.Murmur3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This maps a split (path + offset) to an index based on the number of locations provided.
+ *
+ * If locations do not change across jobs, the intention is to map the same split to the same node.
+ *
+ * A big problem is when nodes change (added, removed, temporarily removed and re-added) etc. That changes
+ * the number of locations / position of locations - and will cause the cache to be almost completely invalidated.
+ *
+ * TODO: Support for consistent hashing when combining the split location generator and the ServiceRegistry.
+ *
+ */
+public class HostAffinitySplitLocationProvider implements SplitLocationProvider {
+
+ private final Logger LOG = LoggerFactory.getLogger(HostAffinitySplitLocationProvider.class);
+ private final boolean isDebugEnabled = LOG.isDebugEnabled();
+
+ private final String[] knownLocations;
+
+ public HostAffinitySplitLocationProvider(String[] knownLocations) {
+ Preconditions.checkState(knownLocations != null && knownLocations.length != 0,
+ HostAffinitySplitLocationProvider.class.getName() +
+ "needs at least 1 location to function");
+ this.knownLocations = knownLocations;
+ }
+
+ @Override
+ public String[] getLocations(InputSplit split) throws IOException {
+ if (split instanceof FileSplit) {
+ FileSplit fsplit = (FileSplit) split;
+ long hash = generateHash(fsplit.getPath().toString(), fsplit.getStart());
+ int indexRaw = (int) (hash % knownLocations.length);
+ int index = Math.abs(indexRaw);
+ if (isDebugEnabled) {
+ LOG.debug(
+ "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart() + ", length=" +
+ fsplit.getLength() + " mapped to index=" + index + ", location=" +
+ knownLocations[index]);
+ }
+ return new String[]{knownLocations[index]};
+ } else {
+ if (isDebugEnabled) {
+ LOG.debug("Split: " + split + " is not a FileSplit. Using default locations");
+ }
+ return split.getLocations();
+ }
+ }
+
+ private long generateHash(String path, long startOffset) throws IOException {
+ // Explicitly using only the start offset of a split, and not the length.
+ // Splits generated on block boundaries and stripe boundaries can vary slightly. Try hashing both to the same node.
+ // There is the drawback of potentially hashing the same data on multiple nodes though, when a large split
+ // is sent to 1 node, and a second invocation uses smaller chunks of the previous large split and send them
+ // to different nodes.
+ DataOutputBuffer dob = new DataOutputBuffer();
+ dob.writeLong(startOffset);
+ dob.writeUTF(path);
+ return Murmur3.hash64(dob.getData(), 0, dob.getLength());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
index aaaa6a5..f4496df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -42,6 +41,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.split.SplitLocationProvider;
import org.apache.hadoop.mapred.split.TezGroupedSplit;
import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper;
import org.apache.tez.dag.api.TaskLocationHint;
@@ -65,14 +65,13 @@ public class SplitGrouper {
private final TezMapredSplitsGrouper tezGrouper = new TezMapredSplitsGrouper();
-
-
/**
* group splits for each bucket separately - while evenly filling all the
* available slots with tasks
*/
public Multimap<Integer, InputSplit> group(Configuration conf,
- Multimap<Integer, InputSplit> bucketSplitMultimap, int availableSlots, float waves)
+ Multimap<Integer, InputSplit> bucketSplitMultimap, int availableSlots, float waves,
+ SplitLocationProvider splitLocationProvider)
throws IOException {
// figure out how many tasks we want for each bucket
@@ -90,9 +89,9 @@ public class SplitGrouper {
InputSplit[] rawSplits = inputSplitCollection.toArray(new InputSplit[0]);
InputSplit[] groupedSplits =
tezGrouper.getGroupedSplits(conf, rawSplits, bucketTaskMap.get(bucketId),
- HiveInputFormat.class.getName(), new ColumnarSplitSizeEstimator());
+ HiveInputFormat.class.getName(), new ColumnarSplitSizeEstimator(), splitLocationProvider);
- LOG.info("Original split size is " + rawSplits.length + " grouped split size is "
+ LOG.info("Original split count is " + rawSplits.length + " grouped split count is "
+ groupedSplits.length + ", for bucket: " + bucketId);
for (InputSplit inSplit : groupedSplits) {
@@ -155,9 +154,10 @@ public class SplitGrouper {
public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
Configuration conf,
InputSplit[] splits,
- float waves, int availableSlots)
+ float waves, int availableSlots,
+ SplitLocationProvider locationProvider)
throws Exception {
- return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true);
+ return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true, locationProvider);
}
/** Generate groups of splits, separated by schema evolution boundaries */
@@ -166,10 +166,12 @@ public class SplitGrouper {
InputSplit[] splits,
float waves, int availableSlots,
String inputName,
- boolean groupAcrossFiles) throws
+ boolean groupAcrossFiles,
+ SplitLocationProvider locationProvider) throws
Exception {
MapWork work = populateMapWork(jobConf, inputName);
+ // ArrayListMultimap is important here to retain the ordering for the splits.
Multimap<Integer, InputSplit> bucketSplitMultiMap =
ArrayListMultimap.<Integer, InputSplit> create();
@@ -188,7 +190,7 @@ public class SplitGrouper {
// group them into the chunks we want
Multimap<Integer, InputSplit> groupedSplits =
- this.group(jobConf, bucketSplitMultiMap, availableSlots, waves);
+ this.group(jobConf, bucketSplitMultiMap, availableSlots, waves, locationProvider);
return groupedSplits;
}
@@ -207,6 +209,8 @@ public class SplitGrouper {
// mapping of bucket id to number of required tasks to run
Map<Integer, Integer> bucketTaskMap = new HashMap<Integer, Integer>();
+ // TODO HIVE-12255. Make use of SplitSizeEstimator.
+ // The actual task computation needs to be looked at as well.
// compute the total size per bucket
long totalSize = 0;
boolean earlyExit = false;
http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
new file mode 100644
index 0000000..3eb858b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.mapred.split.SplitLocationProvider;
+import org.slf4j.Logger;
+
+public class Utils {
+ public static SplitLocationProvider getSplitLocationProvider(Configuration conf, Logger LOG) throws
+ IOException {
+ boolean useCustomLocations =
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS);
+ SplitLocationProvider splitLocationProvider;
+ LOG.info("SplitGenerator using llap affinitized locations: " + useCustomLocations);
+ if (useCustomLocations) {
+ LlapRegistryService serviceRegistry;
+ serviceRegistry = LlapRegistryService.getClient(conf);
+
+ List<ServiceInstance> serviceInstances =
+ serviceRegistry.getInstances().getAllInstancesOrdered();
+ String[] locations = new String[serviceInstances.size()];
+ int i = 0;
+ for (ServiceInstance serviceInstance : serviceInstances) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" +
+ serviceInstance.getHost() + " to list for split locations");
+ }
+ locations[i++] = serviceInstance.getHost();
+ }
+ splitLocationProvider = new HostAffinitySplitLocationProvider(locations);
+ } else {
+ splitLocationProvider = null;
+ }
+ return splitLocationProvider;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
new file mode 100644
index 0000000..d98a5ff
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.junit.Test;
+
+public class TestHostAffinitySplitLocationProvider {
+
+
+ private static final String[] locations = new String[5];
+ private static final Set<String> locationsSet = new HashSet<>();
+ private static final String[] executorLocations = new String[9];
+ private static final Set<String> executorLocationsSet = new HashSet<>();
+
+ static {
+ for (int i = 0 ; i < 5 ; i++) {
+ locations[i] = "location" + i;
+ locationsSet.add(locations[i]);
+ }
+
+ for (int i = 0 ; i < 9 ; i++) {
+ executorLocations[i] = "execLocation" + i;
+ executorLocationsSet.add(executorLocations[i]);
+ }
+
+ }
+
+ @Test (timeout = 5000)
+ public void testNonFileSplits() throws IOException {
+
+ HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations);
+
+ InputSplit inputSplit1 = createMockInputSplit(new String[] {locations[0], locations[1]});
+ InputSplit inputSplit2 = createMockInputSplit(new String[] {locations[2], locations[3]});
+
+ assertArrayEquals(new String[] {locations[0], locations[1]}, locationProvider.getLocations(inputSplit1));
+ assertArrayEquals(new String[] {locations[2], locations[3]}, locationProvider.getLocations(inputSplit2));
+ }
+
+ @Test (timeout = 5000)
+ public void testOrcSplitsBasic() throws IOException {
+ HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations);
+
+ InputSplit os1 = createMockFileSplit(true, "path1", 0, 1000, new String[] {locations[0], locations[1]});
+ InputSplit os2 = createMockFileSplit(true, "path2", 0, 2000, new String[] {locations[2], locations[3]});
+ InputSplit os3 = createMockFileSplit(true, "path3", 1000, 2000, new String[] {locations[0], locations[3]});
+
+ String[] retLoc1 = locationProvider.getLocations(os1);
+ String[] retLoc2 = locationProvider.getLocations(os2);
+ String[] retLoc3 = locationProvider.getLocations(os3);
+
+ assertEquals(1, retLoc1.length);
+ assertFalse(locationsSet.contains(retLoc1[0]));
+ assertTrue(executorLocationsSet.contains(retLoc1[0]));
+
+ assertEquals(1, retLoc2.length);
+ assertFalse(locationsSet.contains(retLoc2[0]));
+ assertTrue(executorLocationsSet.contains(retLoc2[0]));
+
+ assertEquals(1, retLoc3.length);
+ assertFalse(locationsSet.contains(retLoc3[0]));
+ assertTrue(executorLocationsSet.contains(retLoc3[0]));
+ }
+
+ @Test (timeout = 5000)
+ public void testOrcSplitsLocationAffinity() throws IOException {
+ HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations);
+
+ // Same file, offset, different lengths
+ InputSplit os11 = createMockFileSplit(true, "path1", 0, 15000, new String[] {locations[0], locations[1]});
+ InputSplit os12 = createMockFileSplit(true, "path1", 0, 30000, new String[] {locations[0], locations[1]});
+ // Same file, different offset
+ InputSplit os13 = createMockFileSplit(true, "path1", 15000, 30000, new String[] {locations[0], locations[1]});
+
+ String[] retLoc11 = locationProvider.getLocations(os11);
+ String[] retLoc12 = locationProvider.getLocations(os12);
+ String[] retLoc13 = locationProvider.getLocations(os13);
+
+ assertEquals(1, retLoc11.length);
+ assertFalse(locationsSet.contains(retLoc11[0]));
+ assertTrue(executorLocationsSet.contains(retLoc11[0]));
+
+ assertEquals(1, retLoc12.length);
+ assertFalse(locationsSet.contains(retLoc12[0]));
+ assertTrue(executorLocationsSet.contains(retLoc12[0]));
+
+ assertEquals(1, retLoc13.length);
+ assertFalse(locationsSet.contains(retLoc13[0]));
+ assertTrue(executorLocationsSet.contains(retLoc13[0]));
+
+ // Verify the actual locations being correct.
+ // os13 should be on a different location. Splits are supposed to be consistent across JVMs,
+ // the test is setup to verify a different host (make sure not to hash to the same host as os11,os12).
+ // If the test were to fail because the host is the same - the assumption about consistent across JVM
+ // instances is likely incorrect.
+ assertEquals(retLoc11[0], retLoc12[0]);
+ assertNotEquals(retLoc11[0], retLoc13[0]);
+
+
+ // Get locations again, and make sure they're the same.
+ String[] retLoc112 = locationProvider.getLocations(os11);
+ String[] retLoc122 = locationProvider.getLocations(os12);
+ String[] retLoc132 = locationProvider.getLocations(os13);
+ assertArrayEquals(retLoc11, retLoc112);
+ assertArrayEquals(retLoc12, retLoc122);
+ assertArrayEquals(retLoc13, retLoc132);
+ }
+
+
+ private InputSplit createMockInputSplit(String[] locations) throws IOException {
+ InputSplit inputSplit = mock(InputSplit.class);
+ doReturn(locations).when(inputSplit).getLocations();
+ return inputSplit;
+ }
+
+ private InputSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start,
+ long length, String[] locations) throws IOException {
+ FileSplit fileSplit;
+ if (createOrcSplit) {
+ fileSplit = mock(OrcSplit.class);
+ } else {
+ fileSplit = mock(FileSplit.class);
+ }
+
+ doReturn(start).when(fileSplit).getStart();
+ doReturn(length).when(fileSplit).getLength();
+ doReturn(new Path(fakePathString)).when(fileSplit).getPath();
+ doReturn(locations).when(fileSplit).getLocations();
+
+ doReturn(locations).when(fileSplit).getLocations();
+ return fileSplit;
+ }
+
+
+}