You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2016/11/15 22:52:57 UTC
hive git commit: HIVE-15125: LLAP: Parallelize slider package
generator (Gopal V, reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master a4a00b20c -> aa7c9cd61
HIVE-15125: LLAP: Parallelize slider package generator (Gopal V, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aa7c9cd6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aa7c9cd6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aa7c9cd6
Branch: refs/heads/master
Commit: aa7c9cd614804c0bf683745614f7a2b264ce72bf
Parents: a4a00b2
Author: Gopal V <go...@apache.org>
Authored: Tue Nov 15 14:52:26 2016 -0800
Committer: Gopal V <go...@apache.org>
Committed: Tue Nov 15 14:52:26 2016 -0800
----------------------------------------------------------------------
.../hadoop/hive/llap/cli/LlapServiceDriver.java | 685 +++++++++++--------
1 file changed, 390 insertions(+), 295 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/aa7c9cd6/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index 6f533df..dfd2f7b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -34,6 +34,13 @@ import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
@@ -71,6 +78,7 @@ import org.eclipse.jetty.server.ssl.SslSocketConnector;
import org.json.JSONObject;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LlapServiceDriver {
@@ -153,11 +161,21 @@ public class LlapServiceDriver {
}
}
+ private static abstract class NamedCallable<T> implements Callable<T> {
+ public final String taskName;
+ public NamedCallable (String name) {
+ this.taskName = name;
+ }
+ public String getName() {
+ return taskName;
+ }
+ }
+
private void run(String[] args) throws Exception {
LlapOptionsProcessor optionsProcessor = new LlapOptionsProcessor();
- LlapOptions options = optionsProcessor.processOptions(args);
+ final LlapOptions options = optionsProcessor.processOptions(args);
- Properties propsDirectOptions = new Properties();
+ final Properties propsDirectOptions = new Properties();
if (options == null) {
// help
@@ -171,346 +189,418 @@ public class LlapServiceDriver {
throw new Exception("Cannot load any configuration to run command");
}
- FileSystem fs = FileSystem.get(conf);
- FileSystem lfs = FileSystem.getLocal(conf).getRawFileSystem();
-
- // needed so that the file is actually loaded into configuration.
- for (String f : NEEDED_CONFIGS) {
- conf.addResource(f);
- if (conf.getResource(f) == null) {
- throw new Exception("Unable to find required config file: " + f);
- }
- }
- for (String f : OPTIONAL_CONFIGS) {
- conf.addResource(f);
- }
-
- conf.reloadConfiguration();
+ final long t0 = System.nanoTime();
- populateConfWithLlapProperties(conf, options.getConfig());
+ final FileSystem fs = FileSystem.get(conf);
+ final FileSystem lfs = FileSystem.getLocal(conf).getRawFileSystem();
+ final ExecutorService executor =
+ Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2,
+ new ThreadFactoryBuilder().setNameFormat("llap-pkg-%d").build());
+ final CompletionService<Void> asyncRunner = new ExecutorCompletionService<Void>(executor);
- if (options.getName() != null) {
- // update service registry configs - caveat: this has nothing to do with the actual settings
- // as read by the AM
- // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch between
- // instances
- conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + options.getName());
- propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname,
- "@" + options.getName());
- }
-
- if (options.getLogger() != null) {
- HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER, options.getLogger());
- propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname, options.getLogger());
- }
+ try {
- if (options.getSize() != -1) {
- if (options.getCache() != -1) {
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED) == false) {
- // direct heap allocations need to be safer
- Preconditions.checkArgument(options.getCache() < options.getSize(),
- "Cache size (" + humanReadableByteCount(options.getCache()) + ") has to be smaller" +
- " than the container sizing (" + humanReadableByteCount(options.getSize()) + ")");
- } else if (options.getCache() < options.getSize()) {
- LOG.warn("Note that this might need YARN physical memory monitoring to be turned off "
- + "(yarn.nodemanager.pmem-check-enabled=false)");
+ // needed so that the file is actually loaded into configuration.
+ for (String f : NEEDED_CONFIGS) {
+ conf.addResource(f);
+ if (conf.getResource(f) == null) {
+ throw new Exception("Unable to find required config file: " + f);
}
}
- if (options.getXmx() != -1) {
- Preconditions.checkArgument(options.getXmx() < options.getSize(),
- "Working memory (Xmx=" + humanReadableByteCount(options.getXmx()) + ") has to be" +
- " smaller than the container sizing (" +
- humanReadableByteCount(options.getSize()) + ")");
- }
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)
- && false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
- // direct and not memory mapped
- Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(),
- "Working memory + cache (Xmx="+ humanReadableByteCount(options.getXmx()) +
- " + cache=" + humanReadableByteCount(options.getCache()) + ")"
- + " has to be smaller than the container sizing (" +
- humanReadableByteCount(options.getSize()) + ")");
+ for (String f : OPTIONAL_CONFIGS) {
+ conf.addResource(f);
}
- }
- // This parameter is read in package.py - and nowhere else. Does not need to be part of HiveConf - that's just confusing.
- final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1);
- long containerSize = -1;
- if (options.getSize() != -1) {
- containerSize = options.getSize() / (1024 * 1024);
- Preconditions.checkArgument(containerSize >= minAlloc,
- "Container size (" + humanReadableByteCount(options.getSize()) + ") should be greater" +
- " than minimum allocation(" + humanReadableByteCount(minAlloc * 1024L * 1024L) + ")");
- conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
- propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, String.valueOf(containerSize));
- }
+ conf.reloadConfiguration();
- if (options.getExecutors() != -1) {
- conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, options.getExecutors());
- propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, String.valueOf(options.getExecutors()));
- // TODO: vcpu settings - possibly when DRFA works right
- }
+ populateConfWithLlapProperties(conf, options.getConfig());
- if (options.getIoThreads() != -1) {
- conf.setLong(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, options.getIoThreads());
- propsDirectOptions.setProperty(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname,
- String.valueOf(options.getIoThreads()));
- }
+ if (options.getName() != null) {
+ // update service registry configs - caveat: this has nothing to do with the actual settings
+ // as read by the AM
+ // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch between
+ // instances
+ conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + options.getName());
+ propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname,
+ "@" + options.getName());
+ }
- long cache = -1, xmx = -1;
- if (options.getCache() != -1) {
- cache = options.getCache();
- conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, Long.toString(cache));
- propsDirectOptions.setProperty(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
- Long.toString(cache));
- }
+ if (options.getLogger() != null) {
+ HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER, options.getLogger());
+ propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname, options.getLogger());
+ }
- if (options.getXmx() != -1) {
- // Needs more explanation here
- // Xmx is not the max heap value in JDK8. You need to subtract 50% of the survivor fraction
- // from this, to get actual usable memory before it goes into GC
- xmx = options.getXmx();
- long xmxMb = (long)(xmx / (1024 * 1024));
- conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmxMb);
- propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
- String.valueOf(xmxMb));
- }
+ if (options.getSize() != -1) {
+ if (options.getCache() != -1) {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED) == false) {
+ // direct heap allocations need to be safer
+ Preconditions.checkArgument(options.getCache() < options.getSize(), "Cache size ("
+ + humanReadableByteCount(options.getCache()) + ") has to be smaller"
+ + " than the container sizing (" + humanReadableByteCount(options.getSize()) + ")");
+ } else if (options.getCache() < options.getSize()) {
+ LOG.warn("Note that this might need YARN physical memory monitoring to be turned off "
+ + "(yarn.nodemanager.pmem-check-enabled=false)");
+ }
+ }
+ if (options.getXmx() != -1) {
+ Preconditions.checkArgument(options.getXmx() < options.getSize(), "Working memory (Xmx="
+ + humanReadableByteCount(options.getXmx()) + ") has to be"
+ + " smaller than the container sizing (" + humanReadableByteCount(options.getSize())
+ + ")");
+ }
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)
+ && false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
+ // direct and not memory mapped
+ Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(),
+ "Working memory + cache (Xmx=" + humanReadableByteCount(options.getXmx())
+ + " + cache=" + humanReadableByteCount(options.getCache()) + ")"
+ + " has to be smaller than the container sizing ("
+ + humanReadableByteCount(options.getSize()) + ")");
+ }
+ }
- if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty()) {
- conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName());
- propsDirectOptions
- .setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName());
- }
+ // This parameter is read in package.py - and nowhere else. Does not need to be part of
+ // HiveConf - that's just confusing.
+ final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1);
+ long containerSize = -1;
+ if (options.getSize() != -1) {
+ containerSize = options.getSize() / (1024 * 1024);
+ Preconditions.checkArgument(containerSize >= minAlloc, "Container size ("
+ + humanReadableByteCount(options.getSize()) + ") should be greater"
+ + " than minimum allocation(" + humanReadableByteCount(minAlloc * 1024L * 1024L) + ")");
+ conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
+ propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
+ String.valueOf(containerSize));
+ }
- URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE);
+ if (options.getExecutors() != -1) {
+ conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, options.getExecutors());
+ propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
+ String.valueOf(options.getExecutors()));
+ // TODO: vcpu settings - possibly when DRFA works right
+ }
- if (null == logger) {
- throw new Exception("Unable to find required config file: llap-daemon-log4j2.properties");
- }
+ if (options.getIoThreads() != -1) {
+ conf.setLong(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, options.getIoThreads());
+ propsDirectOptions.setProperty(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname,
+ String.valueOf(options.getIoThreads()));
+ }
- Path home = new Path(System.getenv("HIVE_HOME"));
- Path scripts = new Path(new Path(new Path(home, "scripts"), "llap"), "bin");
+ long cache = -1, xmx = -1;
+ if (options.getCache() != -1) {
+ cache = options.getCache();
+ conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, Long.toString(cache));
+ propsDirectOptions.setProperty(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
+ Long.toString(cache));
+ }
- if (!lfs.exists(home)) {
- throw new Exception("Unable to find HIVE_HOME:" + home);
- } else if (!lfs.exists(scripts)) {
- LOG.warn("Unable to find llap scripts:" + scripts);
- }
+ if (options.getXmx() != -1) {
+ // Needs more explanation here
+ // Xmx is not the max heap value in JDK8. You need to subtract 50% of the survivor fraction
+ // from this, to get actual usable memory before it goes into GC
+ xmx = options.getXmx();
+ long xmxMb = (long) (xmx / (1024 * 1024));
+ conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmxMb);
+ propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
+ String.valueOf(xmxMb));
+ }
+ if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty()) {
+ conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName());
+ propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname,
+ options.getLlapQueueName());
+ }
- Path libDir = new Path(tmpDir, "lib");
- Path tezDir = new Path(libDir, "tez");
- Path udfDir = new Path(libDir, "udfs");
+ final URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE);
- String tezLibs = conf.get(TezConfiguration.TEZ_LIB_URIS);
- if (tezLibs == null) {
- LOG.warn("Missing tez.lib.uris in tez-site.xml");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Copying tez libs from " + tezLibs);
- }
- lfs.mkdirs(tezDir);
- fs.copyToLocalFile(new Path(tezLibs), new Path(libDir, "tez.tar.gz"));
- CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), tezDir.toString(), true);
- lfs.delete(new Path(libDir, "tez.tar.gz"), false);
-
- Class<?>[] dependencies = new Class<?>[] {
- LlapDaemonProtocolProtos.class, // llap-common
- LlapTezUtils.class, // llap-tez
- LlapInputFormat.class, // llap-server
- HiveInputFormat.class, // hive-exec
- SslSocketConnector.class, // hive-common (https deps)
- RegistryUtils.ServiceRecordMarshal.class, // ZK registry
- // log4j2
- com.lmax.disruptor.RingBuffer.class, // disruptor
- org.apache.logging.log4j.Logger.class, // log4j-api
- org.apache.logging.log4j.core.Appender.class, // log4j-core
- org.apache.logging.slf4j.Log4jLogger.class, // log4j-slf4j
- // log4j-1.2-API needed for NDC
- org.apache.log4j.NDC.class,
- };
-
- for (Class<?> c : dependencies) {
- Path jarPath = new Path(Utilities.jarFinderGetJar(c));
- lfs.copyFromLocalFile(jarPath, libDir);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Copying " + jarPath + " to " + libDir);
+ if (null == logger) {
+ throw new Exception("Unable to find required config file: llap-daemon-log4j2.properties");
}
- }
+ Path home = new Path(System.getenv("HIVE_HOME"));
+ Path scripts = new Path(new Path(new Path(home, "scripts"), "llap"), "bin");
- // copy default aux classes (json/hbase)
-
- for (String className : DEFAULT_AUX_CLASSES) {
- localizeJarForClass(lfs, libDir, className, false);
- }
- Collection<String> codecs = conf.getStringCollection("io.compression.codecs");
- if (codecs != null) {
- for (String codecClassName : codecs) {
- localizeJarForClass(lfs, libDir, codecClassName, false);
+ if (!lfs.exists(home)) {
+ throw new Exception("Unable to find HIVE_HOME:" + home);
+ } else if (!lfs.exists(scripts)) {
+ LOG.warn("Unable to find llap scripts:" + scripts);
}
- }
- if (options.getIsHBase()) {
- try {
- localizeJarForClass(lfs, libDir, HBASE_SERDE_CLASS, true);
- Job fakeJob = new Job(new JobConf()); // HBase API is convoluted.
- TableMapReduceUtil.addDependencyJars(fakeJob);
- Collection<String> hbaseJars = fakeJob.getConfiguration().getStringCollection("tmpjars");
- for (String jarPath : hbaseJars) {
- if (!jarPath.isEmpty()) {
- lfs.copyFromLocalFile(new Path(jarPath), libDir);
+ final Path libDir = new Path(tmpDir, "lib");
+ final Path tezDir = new Path(libDir, "tez");
+ final Path udfDir = new Path(libDir, "udfs");
+ final Path confPath = new Path(tmpDir, "conf");
+ lfs.mkdirs(confPath);
+
+ NamedCallable<Void> downloadTez = new NamedCallable<Void>("downloadTez") {
+ @Override
+ public Void call() throws Exception {
+ synchronized (fs) {
+ String tezLibs = conf.get(TezConfiguration.TEZ_LIB_URIS);
+ if (tezLibs == null) {
+ LOG.warn("Missing tez.lib.uris in tez-site.xml");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Copying tez libs from " + tezLibs);
+ }
+ lfs.mkdirs(tezDir);
+ fs.copyToLocalFile(new Path(tezLibs), new Path(libDir, "tez.tar.gz"));
+ CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), tezDir.toString(),
+ true);
+ lfs.delete(new Path(libDir, "tez.tar.gz"), false);
}
+ return null;
}
- } catch (Throwable t) {
- String err = "Failed to add HBase jars. Use --auxhbase=false to avoid localizing them";
- LOG.error(err);
- System.err.println(err);
- throw new RuntimeException(t);
- }
- }
-
- String auxJars = options.getAuxJars();
- if (auxJars != null && !auxJars.isEmpty()) {
- // TODO: transitive dependencies warning?
- String[] jarPaths = auxJars.split(",");
- for (String jarPath : jarPaths) {
- if (!jarPath.isEmpty()) {
- lfs.copyFromLocalFile(new Path(jarPath), libDir);
+ };
+
+ NamedCallable<Void> copyLocalJars = new NamedCallable<Void>("copyLocalJars") {
+ @Override
+ public Void call() throws Exception {
+ Class<?>[] dependencies = new Class<?>[] { LlapDaemonProtocolProtos.class, // llap-common
+ LlapTezUtils.class, // llap-tez
+ LlapInputFormat.class, // llap-server
+ HiveInputFormat.class, // hive-exec
+ SslSocketConnector.class, // hive-common (https deps)
+ RegistryUtils.ServiceRecordMarshal.class, // ZK registry
+ // log4j2
+ com.lmax.disruptor.RingBuffer.class, // disruptor
+ org.apache.logging.log4j.Logger.class, // log4j-api
+ org.apache.logging.log4j.core.Appender.class, // log4j-core
+ org.apache.logging.slf4j.Log4jLogger.class, // log4j-slf4j
+ // log4j-1.2-API needed for NDC
+ org.apache.log4j.NDC.class, };
+
+ for (Class<?> c : dependencies) {
+ Path jarPath = new Path(Utilities.jarFinderGetJar(c));
+ lfs.copyFromLocalFile(jarPath, libDir);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Copying " + jarPath + " to " + libDir);
+ }
+ }
+ return null;
}
- }
- }
+ };
- // UDFs
- final Set<String> allowedUdfs;
-
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOW_PERMANENT_FNS)) {
- allowedUdfs = downloadPermanentFunctions(conf, udfDir);
- } else {
- allowedUdfs = Collections.emptySet();
- }
+ // copy default aux classes (json/hbase)
- String java_home;
- if (options.getJavaPath() == null || options.getJavaPath().isEmpty()) {
- java_home = System.getenv("JAVA_HOME");
- String jre_home = System.getProperty("java.home");
- if (java_home == null) {
- java_home = jre_home;
- } else if (!java_home.equals(jre_home)) {
- LOG.warn("Java versions might not match : JAVA_HOME=[{}],process jre=[{}]",
- java_home, jre_home);
- }
- } else {
- java_home = options.getJavaPath();
- }
- if (java_home == null || java_home.isEmpty()) {
- throw new RuntimeException(
- "Could not determine JAVA_HOME from command line parameters, environment or system properties");
- }
- LOG.info("Using [{}] for JAVA_HOME", java_home);
+ NamedCallable<Void> copyAuxJars = new NamedCallable<Void>("copyAuxJars") {
+ @Override
+ public Void call() throws Exception {
+ for (String className : DEFAULT_AUX_CLASSES) {
+ localizeJarForClass(lfs, libDir, className, false);
+ }
+ Collection<String> codecs = conf.getStringCollection("io.compression.codecs");
+ if (codecs != null) {
+ for (String codecClassName : codecs) {
+ localizeJarForClass(lfs, libDir, codecClassName, false);
+ }
+ }
- Path confPath = new Path(tmpDir, "conf");
- lfs.mkdirs(confPath);
+ if (options.getIsHBase()) {
+ try {
+ localizeJarForClass(lfs, libDir, HBASE_SERDE_CLASS, true);
+ Job fakeJob = new Job(new JobConf()); // HBase API is convoluted.
+ TableMapReduceUtil.addDependencyJars(fakeJob);
+ Collection<String> hbaseJars =
+ fakeJob.getConfiguration().getStringCollection("tmpjars");
+ for (String jarPath : hbaseJars) {
+ if (!jarPath.isEmpty()) {
+ lfs.copyFromLocalFile(new Path(jarPath), libDir);
+ }
+ }
+ } catch (Throwable t) {
+ String err =
+ "Failed to add HBase jars. Use --auxhbase=false to avoid localizing them";
+ LOG.error(err);
+ System.err.println(err);
+ throw new RuntimeException(t);
+ }
+ }
- // Copy over the mandatory configs for the package.
- for (String f : NEEDED_CONFIGS) {
- copyConfig(lfs, confPath, f);
- }
- for (String f : OPTIONAL_CONFIGS) {
- try {
- copyConfig(lfs, confPath, f);
- } catch (Throwable t) {
- LOG.info("Error getting an optional config " + f + "; ignoring: " + t.getMessage());
- }
- }
- createLlapDaemonConfig(lfs, confPath, conf, propsDirectOptions, options.getConfig());
-
- // logger can be a resource stream or a real file (cannot use copy)
- InputStream loggerContent = logger.openStream();
- IOUtils.copyBytes(loggerContent,
- lfs.create(new Path(confPath, "llap-daemon-log4j2.properties"), true), conf, true);
-
- String metricsFile = LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE;
- URL metrics2 = conf.getResource(metricsFile);
- if (metrics2 == null) {
- LOG.warn(LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " cannot be found." +
- " Looking for " + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE);
- metricsFile = LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE;
- metrics2 = conf.getResource(metricsFile);
- }
- if (metrics2 != null) {
- InputStream metrics2FileStream = metrics2.openStream();
- IOUtils.copyBytes(metrics2FileStream, lfs.create(new Path(confPath, metricsFile), true),
- conf, true);
- LOG.info("Copied hadoop metrics2 properties file from " + metrics2);
- } else {
- LOG.warn("Cannot find " + LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " or " +
- LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE + " in classpath.");
- }
+ String auxJars = options.getAuxJars();
+ if (auxJars != null && !auxJars.isEmpty()) {
+ // TODO: transitive dependencies warning?
+ String[] jarPaths = auxJars.split(",");
+ for (String jarPath : jarPaths) {
+ if (!jarPath.isEmpty()) {
+ lfs.copyFromLocalFile(new Path(jarPath), libDir);
+ }
+ }
+ }
+ return null;
+ }
+ };
+
+ NamedCallable<Void> copyUdfJars = new NamedCallable<Void>("copyUdfJars") {
+ @Override
+ public Void call() throws Exception {
+ // UDFs
+ final Set<String> allowedUdfs;
+
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOW_PERMANENT_FNS)) {
+ synchronized (fs) {
+ allowedUdfs = downloadPermanentFunctions(conf, udfDir);
+ }
+ } else {
+ allowedUdfs = Collections.emptySet();
+ }
- PrintWriter udfStream =
- new PrintWriter(lfs.create(new Path(confPath, StaticPermanentFunctionChecker.PERMANENT_FUNCTIONS_LIST)));
- for (String udfClass : allowedUdfs) {
- udfStream.println(udfClass);
- }
-
- udfStream.close();
+ PrintWriter udfStream =
+ new PrintWriter(lfs.create(new Path(confPath,
+ StaticPermanentFunctionChecker.PERMANENT_FUNCTIONS_LIST)));
+ for (String udfClass : allowedUdfs) {
+ udfStream.println(udfClass);
+ }
- // extract configs for processing by the python fragments in Slider
- JSONObject configs = new JSONObject();
+ udfStream.close();
+ return null;
+ }
+ };
+
+ String java_home;
+ if (options.getJavaPath() == null || options.getJavaPath().isEmpty()) {
+ java_home = System.getenv("JAVA_HOME");
+ String jre_home = System.getProperty("java.home");
+ if (java_home == null) {
+ java_home = jre_home;
+ } else if (!java_home.equals(jre_home)) {
+ LOG.warn("Java versions might not match : JAVA_HOME=[{}],process jre=[{}]", java_home,
+ jre_home);
+ }
+ } else {
+ java_home = options.getJavaPath();
+ }
+ if (java_home == null || java_home.isEmpty()) {
+ throw new RuntimeException(
+ "Could not determine JAVA_HOME from command line parameters, environment or system properties");
+ }
+ LOG.info("Using [{}] for JAVA_HOME", java_home);
+
+ NamedCallable<Void> copyConfigs = new NamedCallable<Void>("copyConfigs") {
+ @Override
+ public Void call() throws Exception {
+ // Copy over the mandatory configs for the package.
+ for (String f : NEEDED_CONFIGS) {
+ copyConfig(lfs, confPath, f);
+ }
+ for (String f : OPTIONAL_CONFIGS) {
+ try {
+ copyConfig(lfs, confPath, f);
+ } catch (Throwable t) {
+ LOG.info("Error getting an optional config " + f + "; ignoring: " + t.getMessage());
+ }
+ }
+ createLlapDaemonConfig(lfs, confPath, conf, propsDirectOptions, options.getConfig());
+
+ // logger can be a resource stream or a real file (cannot use copy)
+ InputStream loggerContent = logger.openStream();
+ IOUtils.copyBytes(loggerContent,
+ lfs.create(new Path(confPath, "llap-daemon-log4j2.properties"), true), conf, true);
+
+ String metricsFile = LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE;
+ URL metrics2 = conf.getResource(metricsFile);
+ if (metrics2 == null) {
+ LOG.warn(LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " cannot be found."
+ + " Looking for " + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE);
+ metricsFile = LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE;
+ metrics2 = conf.getResource(metricsFile);
+ }
+ if (metrics2 != null) {
+ InputStream metrics2FileStream = metrics2.openStream();
+ IOUtils.copyBytes(metrics2FileStream,
+ lfs.create(new Path(confPath, metricsFile), true), conf, true);
+ LOG.info("Copied hadoop metrics2 properties file from " + metrics2);
+ } else {
+ LOG.warn("Cannot find " + LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " or "
+ + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE + " in classpath.");
+ }
+ return null;
+ }
+ };
+
+ @SuppressWarnings("unchecked")
+ final NamedCallable<Void>[] asyncWork =
+ new NamedCallable[] {
+ downloadTez,
+ copyUdfJars,
+ copyLocalJars,
+ copyAuxJars,
+ copyConfigs };
+ @SuppressWarnings("unchecked")
+ final Future<Void>[] asyncResults = new Future[asyncWork.length];
+ for (int i = 0; i < asyncWork.length; i++) {
+ asyncResults[i] = asyncRunner.submit(asyncWork[i]);
+ }
- configs.put("java.home", java_home);
+ // extract configs for processing by the python fragments in Slider
+ JSONObject configs = new JSONObject();
- configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, HiveConf.getIntVar(conf,
- ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB));
- configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
+ configs.put("java.home", java_home);
- configs.put(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
- HiveConf.getSizeVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE));
+ configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
+ HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB));
+ configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
- configs.put(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT.varname,
- HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT));
+ configs.put(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
+ HiveConf.getSizeVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE));
- configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, HiveConf.getIntVar(conf,
- ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
+ configs.put(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT.varname,
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT));
- configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname, HiveConf.getIntVar(conf,
- ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE));
+ configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
+ HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
- configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, HiveConf.getIntVar(conf,
- ConfVars.LLAP_DAEMON_NUM_EXECUTORS));
+ configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname,
+ HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE));
- // Let YARN pick the queue name, if it isn't provided in hive-site, or via the command-line
- if (HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME) != null) {
- configs.put(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname,
- HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME));
- }
+ configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
+ HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS));
- // Propagate the cluster name to the script.
- String clusterHosts = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
- if (!StringUtils.isEmpty(clusterHosts) && clusterHosts.startsWith("@") &&
- clusterHosts.length() > 1) {
- configs.put(CONFIG_CLUSTER_NAME, clusterHosts.substring(1));
- }
+ // Let YARN pick the queue name, if it isn't provided in hive-site, or via the command-line
+ if (HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME) != null) {
+ configs.put(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname,
+ HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME));
+ }
- configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
- conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1));
+ // Propagate the cluster name to the script.
+ String clusterHosts = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+ if (!StringUtils.isEmpty(clusterHosts) && clusterHosts.startsWith("@")
+ && clusterHosts.length() > 1) {
+ configs.put(CONFIG_CLUSTER_NAME, clusterHosts.substring(1));
+ }
+
+ configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1));
- configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
- conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, -1));
+ configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, -1));
- long maxDirect = (xmx > 0 && cache > 0 && xmx < cache * 1.25) ? (long)(cache * 1.25) : -1;
- configs.put("max_direct_memory", Long.toString(maxDirect));
+ long maxDirect = (xmx > 0 && cache > 0 && xmx < cache * 1.25) ? (long) (cache * 1.25) : -1;
+ configs.put("max_direct_memory", Long.toString(maxDirect));
- FSDataOutputStream os = lfs.create(new Path(tmpDir, "config.json"));
- OutputStreamWriter w = new OutputStreamWriter(os);
- configs.write(w);
- w.close();
- os.close();
+ FSDataOutputStream os = lfs.create(new Path(tmpDir, "config.json"));
+ OutputStreamWriter w = new OutputStreamWriter(os);
+ configs.write(w);
+ w.close();
+ os.close();
- lfs.close();
- fs.close();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Config generation took " + (System.nanoTime() - t0) + " ns");
+ }
+ for (int i = 0; i < asyncWork.length; i++) {
+ final long t1 = System.nanoTime();
+ asyncResults[i].get();
+ final long t2 = System.nanoTime();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(asyncWork[i].getName() + " waited for " + (t2 - t1) + " ns");
+ }
+ }
+ } finally {
+ executor.shutdown();
+ lfs.close();
+ fs.close();
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Exiting successfully");
@@ -520,7 +610,12 @@ public class LlapServiceDriver {
private Set<String> downloadPermanentFunctions(Configuration conf, Path udfDir) throws HiveException,
URISyntaxException, IOException {
Map<String,String> udfs = new HashMap<String, String>();
- Hive hive = Hive.get(false);
+ HiveConf hiveConf = new HiveConf();
+ // disable expensive operations on the metastore
+ hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_INIT_METADATA_COUNT_ENABLED, false);
+ hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_METRICS, false);
+ // performance problem: ObjectStore does its own new HiveConf()
+ Hive hive = Hive.getWithFastCheck(hiveConf, false);
ResourceDownloader resourceDownloader =
new ResourceDownloader(conf, udfDir.toUri().normalize().getPath());
List<Function> fns = hive.getAllFunctions();