You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/12/05 22:06:34 UTC
[1/2] hive git commit: HIVE-11358 : LLAP: move LlapConfiguration into
HiveConf and document the settings (Sergey Shelukhin,
reviewed by Lefty Leverenz, Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/branch-2.0 2a6ebb885 -> f40c1502d
refs/heads/master a75085688 -> 6cc5761b0
HIVE-11358 : LLAP: move LlapConfiguration into HiveConf and document the settings (Sergey Shelukhin, reviewed by Lefty Leverenz, Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6cc5761b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6cc5761b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6cc5761b
Branch: refs/heads/master
Commit: 6cc5761b0f4d9bb518e1fa7d1f5960e0b07feca5
Parents: a750856
Author: Sergey Shelukhin <se...@apache.org>
Authored: Sat Dec 5 12:59:40 2015 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Sat Dec 5 12:59:40 2015 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 163 ++++++++++++++++++-
data/conf/llap/llap-daemon-site.xml | 9 +-
.../llap/configuration/LlapConfiguration.java | 139 +---------------
.../registry/impl/LlapFixedRegistryImpl.java | 18 +-
.../llap/registry/impl/LlapRegistryService.java | 4 +-
.../registry/impl/LlapYarnRegistryImpl.java | 27 ++-
.../hadoop/hive/llap/cli/LlapServiceDriver.java | 30 ++--
.../hive/llap/daemon/impl/AMReporter.java | 20 +--
.../llap/daemon/impl/ContainerRunnerImpl.java | 6 +-
.../hive/llap/daemon/impl/LlapDaemon.java | 33 ++--
.../hive/llap/daemon/impl/QueryFileCleaner.java | 6 +-
.../hive/llap/daemon/impl/QueryTracker.java | 7 +-
.../daemon/services/impl/LlapWebServices.java | 9 +-
.../llap/tezplugins/LlapTaskCommunicator.java | 10 +-
.../hive/llap/tezplugins/TaskCommunicator.java | 15 +-
.../dag/app/rm/LlapTaskSchedulerService.java | 36 ++--
.../hive/llap/daemon/MiniLlapCluster.java | 9 +-
.../impl/TestLlapDaemonProtocolServerImpl.java | 8 +-
.../app/rm/TestLlapTaskSchedulerService.java | 13 +-
19 files changed, 280 insertions(+), 282 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 803d52b..d52f994 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2370,6 +2370,112 @@ public class HiveConf extends Configuration {
LLAP_IO_THREADPOOL_SIZE("hive.llap.io.threadpool.size", 10,
"Specify the number of threads to use for low-level IO thread pool."),
+ LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5,
+ "Number of RPC handlers for LLAP daemon.", "llap.daemon.rpc.num.handlers"),
+ LLAP_DAEMON_WORK_DIRS("hive.llap.daemon.work.dirs", "",
+ "Working directories for the daemon. Needs to be set for a secure cluster, since LLAP may\n" +
+ "not have access to the default YARN working directories.", "llap.daemon.work.dirs"),
+ LLAP_DAEMON_YARN_SHUFFLE_PORT("hive.llap.daemon.yarn.shuffle.port", 15551,
+ "YARN shuffle port for LLAP-daemon-hosted shuffle.", "llap.daemon.yarn.shuffle.port"),
+ LLAP_DAEMON_YARN_CONTAINER_MB("hive.llap.daemon.yarn.container.mb", -1,
+ "TODO doc. Unused?", "llap.daemon.yarn.container.mb"),
+ LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED("hive.llap.daemon.shuffle.dir.watcher.enabled", false,
+ "TODO doc", "llap.daemon.shuffle.dir-watcher.enabled"),
+ LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS(
+ "hive.llap.daemon.am.liveness.heartbeat.interval.ms", "10000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Tez AM-LLAP heartbeat interval (milliseconds). This needs to be below the task timeout\n" +
+ "interval, but otherwise as high as possible to avoid unnecessary traffic.",
+ "llap.daemon.am.liveness.heartbeat.interval-ms"),
+ LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS(
+ "hive.llap.am.liveness.connection.timeout.ms", "10000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Amount of time to wait on connection failures to the AM from an LLAP daemon before\n" +
+ "considering the AM to be dead.", "llap.am.liveness.connection.timeout-millis"),
+ // Not used yet - since the Writable RPC engine does not support this policy.
+ LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS(
+ "hive.llap.am.liveness.connection.sleep.between.retries.ms", "2000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Sleep duration while waiting to retry connection failures to the AM from the daemon for\n" +
+ "the general keep-alive thread (milliseconds).",
+ "llap.am.liveness.connection.sleep-between-retries-millis"),
+ LLAP_DAEMON_NUM_EXECUTORS("hive.llap.daemon.num.executors", 4,
+ "Number of executors to use in LLAP daemon; essentially, the number of tasks that can be\n" +
+ "executed in parallel.", "llap.daemon.num.executors"),
+ LLAP_DAEMON_RPC_PORT("hive.llap.daemon.rpc.port", 15001, "The LLAP daemon RPC port.",
+ "llap.daemon.rpc.port"),
+ LLAP_DAEMON_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.memory.per.instance.mb", 4096,
+ "The total amount of memory to use for the executors inside LLAP (in megabytes).",
+ "llap.daemon.memory.per.instance.mb"),
+ LLAP_DAEMON_VCPUS_PER_INSTANCE("hive.llap.daemon.vcpus.per.instance", 4,
+ "The total number of vcpus to use for the executors inside LLAP.",
+ "llap.daemon.vcpus.per.instance"),
+ LLAP_DAEMON_NUM_FILE_CLEANER_THREADS("hive.llap.daemon.num.file.cleaner.threads", 1,
+ "Number of file cleaner threads in LLAP.", "llap.daemon.num.file.cleaner.threads"),
+ LLAP_FILE_CLEANUP_DELAY_SECONDS("hive.llap.file.cleanup.delay.seconds", "300s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "How long to delay before cleaning up query files in LLAP (in seconds, for debugging).",
+ "llap.file.cleanup.delay-seconds"),
+ LLAP_DAEMON_SERVICE_HOSTS("hive.llap.daemon.service.hosts", "",
+ "Explicitly specified hosts to use for LLAP scheduling. Useful for testing. By default,\n" +
+ "YARN registry is used.", "llap.daemon.service.hosts"),
+ LLAP_DAEMON_SERVICE_REFRESH_INTERVAL("hive.llap.daemon.service.refresh.interval.sec", "60s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "LLAP YARN registry service list refresh delay, in seconds.",
+ "llap.daemon.service.refresh.interval"),
+ LLAP_DAEMON_COMMUNICATOR_NUM_THREADS("hive.llap.daemon.communicator.num.threads", 10,
+ "Number of threads to use in LLAP task communicator in Tez AM.",
+ "llap.daemon.communicator.num.threads"),
+ LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS(
+ "hive.llap.task.scheduler.node.reenable.min.timeout.ms", "200ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Minimum time after which a previously disabled node will be re-enabled for scheduling,\n" +
+ "in milliseconds. This may be modified by an exponential back-off if failures persist.",
+ "llap.task.scheduler.node.re-enable.min.timeout.ms"),
+ LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MS(
+ "hive.llap.task.scheduler.node.reenable.max.timeout.ms", "10000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Maximum time after which a previously disabled node will be re-enabled for scheduling,\n" +
+ "in milliseconds. This may be modified by an exponential back-off if failures persist.",
+ "llap.task.scheduler.node.re-enable.max.timeout.ms"),
+ LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR(
+ "hive.llap.task.scheduler.node.disable.backoff.factor", 1.5f,
+ "Backoff factor on successive blacklists of a node due to some failures. Blacklist times\n" +
+ "start at the min timeout and go up to the max timeout based on this backoff factor.",
+ "llap.task.scheduler.node.disable.backoff.factor"),
+ LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE(
+ "hive.llap.task.scheduler.num.schedulable.tasks.per.node", 0,
+ "The number of tasks the AM TaskScheduler will try allocating per node. 0 indicates that\n" +
+ "this should be picked up from the Registry. -1 indicates unlimited capacity; positive\n" +
+ "values indicate a specific bound.", "llap.task.scheduler.num.schedulable.tasks.per.node"),
+ LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE("hive.llap.daemon.task.scheduler.wait.queue.size",
+ 10, "LLAP scheduler maximum queue size.", "llap.daemon.task.scheduler.wait.queue.size"),
+ LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME(
+ "hive.llap.daemon.wait.queue.comparator.class.name",
+ "org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator",
+ "The priority comparator to use for LLAP scheduler prioroty queue. The built-in options\n" +
+ "are org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator and\n" +
+ ".....FirstInFirstOutComparator", "llap.daemon.wait.queue.comparator.class.name"),
+ LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION(
+ "hive.llap.daemon.task.scheduler.enable.preemption", true,
+ "Whether non-finishable running tasks (e.g. a reducer waiting for inputs) should be\n" +
+ "preempted by finishable tasks inside LLAP scheduler.",
+ "llap.daemon.task.scheduler.enable.preemption"),
+ LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS(
+ "hive.llap.task.communicator.connection.timeout.ms", "16000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Connection timeout (in milliseconds) before a failure to an LLAP daemon from Tez AM.",
+ "llap.task.communicator.connection.timeout-millis"),
+ LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS(
+ "hive.llap.task.communicator.connection.sleep.between.retries.ms", "2000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Sleep duration (in milliseconds) to wait before retrying on error when obtaining a\n" +
+ "connection to LLAP daemon from Tez AM.",
+ "llap.task.communicator.connection.sleep-between-retries-millis"),
+ LLAP_DAEMON_WEB_PORT("hive.llap.daemon.web.port", 15002, "LLAP daemon web UI port.",
+ "llap.daemon.service.port"),
+ LLAP_DAEMON_WEB_SSL("hive.llap.daemon.web.ssl", false,
+ "Whether LLAP daemon web UI should use SSL.", "llap.daemon.service.ssl"),
SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
"60s", new TimeValidator(TimeUnit.SECONDS),
@@ -2422,6 +2528,7 @@ public class HiveConf extends Configuration {
public final String varname;
+ private final String altName;
private final String defaultExpr;
public final String defaultStrVal;
@@ -2441,28 +2548,39 @@ public class HiveConf extends Configuration {
private final boolean caseSensitive;
ConfVars(String varname, Object defaultVal, String description) {
- this(varname, defaultVal, null, description, true, false);
+ this(varname, defaultVal, null, description, true, false, null);
+ }
+
+ ConfVars(String varname, Object defaultVal, String description, String altName) {
+ this(varname, defaultVal, null, description, true, false, altName);
+ }
+
+ ConfVars(String varname, Object defaultVal, Validator validator, String description,
+ String altName) {
+ this(varname, defaultVal, validator, description, true, false, altName);
}
ConfVars(String varname, Object defaultVal, String description, boolean excluded) {
- this(varname, defaultVal, null, description, true, excluded);
+ this(varname, defaultVal, null, description, true, excluded, null);
}
ConfVars(String varname, String defaultVal, boolean caseSensitive, String description) {
- this(varname, defaultVal, null, description, caseSensitive, false);
+ this(varname, defaultVal, null, description, caseSensitive, false, null);
}
ConfVars(String varname, Object defaultVal, Validator validator, String description) {
- this(varname, defaultVal, validator, description, true, false);
+ this(varname, defaultVal, validator, description, true, false, null);
}
- ConfVars(String varname, Object defaultVal, Validator validator, String description, boolean caseSensitive, boolean excluded) {
+ ConfVars(String varname, Object defaultVal, Validator validator, String description,
+ boolean caseSensitive, boolean excluded, String altName) {
this.varname = varname;
this.validator = validator;
this.description = description;
this.defaultExpr = defaultVal == null ? null : String.valueOf(defaultVal);
this.excluded = excluded;
this.caseSensitive = caseSensitive;
+ this.altName = altName;
if (defaultVal == null || defaultVal instanceof String) {
this.valClass = String.class;
this.valType = VarType.STRING;
@@ -2704,6 +2822,9 @@ public class HiveConf extends Configuration {
public static int getIntVar(Configuration conf, ConfVars var) {
assert (var.valClass == Integer.class) : var.varname;
+ if (var.altName != null) {
+ return conf.getInt(var.varname, conf.getInt(var.altName, var.defaultIntVal));
+ }
return conf.getInt(var.varname, var.defaultIntVal);
}
@@ -2798,10 +2919,16 @@ public class HiveConf extends Configuration {
public static long getLongVar(Configuration conf, ConfVars var) {
assert (var.valClass == Long.class) : var.varname;
+ if (var.altName != null) {
+ return conf.getLong(var.varname, conf.getLong(var.altName, var.defaultLongVal));
+ }
return conf.getLong(var.varname, var.defaultLongVal);
}
public static long getLongVar(Configuration conf, ConfVars var, long defaultVal) {
+ if (var.altName != null) {
+ return conf.getLong(var.varname, conf.getLong(var.altName, defaultVal));
+ }
return conf.getLong(var.varname, defaultVal);
}
@@ -2820,10 +2947,16 @@ public class HiveConf extends Configuration {
public static float getFloatVar(Configuration conf, ConfVars var) {
assert (var.valClass == Float.class) : var.varname;
+ if (var.altName != null) {
+ return conf.getFloat(var.varname, conf.getFloat(var.altName, var.defaultFloatVal));
+ }
return conf.getFloat(var.varname, var.defaultFloatVal);
}
public static float getFloatVar(Configuration conf, ConfVars var, float defaultVal) {
+ if (var.altName != null) {
+ return conf.getFloat(var.varname, conf.getFloat(var.altName, defaultVal));
+ }
return conf.getFloat(var.varname, defaultVal);
}
@@ -2842,10 +2975,16 @@ public class HiveConf extends Configuration {
public static boolean getBoolVar(Configuration conf, ConfVars var) {
assert (var.valClass == Boolean.class) : var.varname;
+ if (var.altName != null) {
+ return conf.getBoolean(var.varname, conf.getBoolean(var.altName, var.defaultBoolVal));
+ }
return conf.getBoolean(var.varname, var.defaultBoolVal);
}
public static boolean getBoolVar(Configuration conf, ConfVars var, boolean defaultVal) {
+ if (var.altName != null) {
+ return conf.getBoolean(var.varname, conf.getBoolean(var.altName, defaultVal));
+ }
return conf.getBoolean(var.varname, defaultVal);
}
@@ -2864,10 +3003,24 @@ public class HiveConf extends Configuration {
public static String getVar(Configuration conf, ConfVars var) {
assert (var.valClass == String.class) : var.varname;
+ if (var.altName != null) {
+ return conf.get(var.varname, conf.get(var.altName, var.defaultStrVal));
+ }
return conf.get(var.varname, var.defaultStrVal);
}
+ public static String getTrimmedVar(Configuration conf, ConfVars var) {
+ assert (var.valClass == String.class) : var.varname;
+ if (var.altName != null) {
+ return conf.getTrimmed(var.varname, conf.getTrimmed(var.altName, var.defaultStrVal));
+ }
+ return conf.getTrimmed(var.varname, var.defaultStrVal);
+ }
+
public static String getVar(Configuration conf, ConfVars var, String defaultVal) {
+ if (var.altName != null) {
+ return conf.get(var.varname, conf.get(var.altName, defaultVal));
+ }
return conf.get(var.varname, defaultVal);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/data/conf/llap/llap-daemon-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/llap/llap-daemon-site.xml b/data/conf/llap/llap-daemon-site.xml
index f2851a7..cc3e438 100644
--- a/data/conf/llap/llap-daemon-site.xml
+++ b/data/conf/llap/llap-daemon-site.xml
@@ -16,25 +16,24 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-
<configuration>
<property>
- <name>llap.daemon.service.hosts</name>
+ <name>hive.llap.daemon.service.hosts</name>
<value>localhost</value>
</property>
<property>
- <name>llap.daemon.service.port</name>
+ <name>hive.llap.daemon.service.port</name>
<value>0</value>
</property>
<property>
- <name>llap.daemon.num.executors</name>
+ <name>hive.llap.daemon.num.executors</name>
<value>4</value>
</property>
<property>
- <name>llap.daemon.task.scheduler.wait.queue.size</name>
+ <name>hive.llap.daemon.task.scheduler.wait.queue.size</name>
<value>4</value>
</property>
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
index bd09024..abdbc09 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
@@ -19,6 +19,8 @@ import java.net.URL;
import org.apache.hadoop.conf.Configuration;
public class LlapConfiguration extends Configuration {
+ public static final String LLAP_PREFIX = "llap.";
+ public static final String LLAP_DAEMON_PREFIX = "llap.daemon.";
public LlapConfiguration(Configuration conf) {
super(conf);
@@ -35,142 +37,5 @@ public class LlapConfiguration extends Configuration {
addResource(llapDaemonConfLocation);
}
- public static final String LLAP_PREFIX = "llap.";
-
- public static final String LLAP_DAEMON_PREFIX = "llap.daemon.";
private static final String LLAP_DAEMON_SITE = "llap-daemon-site.xml";
-
-
-
- public static final String LLAP_DAEMON_RPC_NUM_HANDLERS = LLAP_DAEMON_PREFIX + "rpc.num.handlers";
- public static final int LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT = 5;
-
- public static final String LLAP_DAEMON_WORK_DIRS = LLAP_DAEMON_PREFIX + "work.dirs";
-
- public static final String LLAP_DAEMON_YARN_SHUFFLE_PORT = LLAP_DAEMON_PREFIX + "yarn.shuffle.port";
- public static final int LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT = 15551;
-
- public static final String LLAP_DAEMON_YARN_CONTAINER_MB = LLAP_DAEMON_PREFIX + "yarn.container.mb";
- public static final int LLAP_DAEMON_YARN_CONTAINER_MB_DEFAULT = -1;
-
- public static final String LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED = LLAP_DAEMON_PREFIX + "shuffle.dir-watcher.enabled";
- public static final boolean LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT = false;
-
- // This needs to be kept below the task timeout interval, but otherwise as high as possible to avoid unnecessary traffic.
- public static final String LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS = LLAP_DAEMON_PREFIX + "am.liveness.heartbeat.interval-ms";
- public static final long LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT = 10000l;
-
- /**
- * Amount of time to wait on connection failures to the AM from an LLAP daemon before considering
- * the AM to be dead
- */
- public static final String LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS =
- LLAP_PREFIX + "am.liveness.connection.timeout-millis";
- public static final long LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS_DEFAULT = 10000l;
-
- // Not used yet - since the Writable RPC engine does not support this policy.
- /**
- * Sleep duration while waiting to retry connection failures to the AM from the daemon for the
- * general keep-alive thread
- */
- public static final String LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS =
- LLAP_PREFIX + "am.liveness.connection.sleep-between-retries-millis";
- public static final long LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT =
- 2000l;
-
-
- // Section for configs used in AM and executors
- public static final String LLAP_DAEMON_NUM_EXECUTORS = LLAP_DAEMON_PREFIX + "num.executors";
- public static final int LLAP_DAEMON_NUM_EXECUTORS_DEFAULT = 4;
-
- public static final String LLAP_DAEMON_RPC_PORT = LLAP_DAEMON_PREFIX + "rpc.port";
- public static final int LLAP_DAEMON_RPC_PORT_DEFAULT = 15001;
-
- public static final String LLAP_DAEMON_MEMORY_PER_INSTANCE_MB = LLAP_DAEMON_PREFIX + "memory.per.instance.mb";
- public static final int LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT = 4096;
-
- public static final String LLAP_DAEMON_VCPUS_PER_INSTANCE = LLAP_DAEMON_PREFIX + "vcpus.per.instance";
- public static final int LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT = 4;
-
- public static final String LLAP_DAEMON_NUM_FILE_CLEANER_THREADS = LLAP_DAEMON_PREFIX + "num.file.cleaner.threads";
- public static final int LLAP_DAEMON_NUM_FILE_CLEANER_THREADS_DEFAULT = 1;
-
-
- // Section for configs used in the AM //
- public static final String LLAP_FILE_CLEANUP_DELAY_SECONDS = LLAP_PREFIX + "file.cleanup.delay-seconds";
- public static final long LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT = 300; // 5 minutes by default
-
- public static final String LLAP_DAEMON_SERVICE_HOSTS = LLAP_DAEMON_PREFIX + "service.hosts";
-
- public static final String LLAP_DAEMON_SERVICE_REFRESH_INTERVAL = LLAP_DAEMON_PREFIX + "service.refresh.interval";
- public static final int LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT = 60; // seconds
-
- public static final String LLAP_DAEMON_COMMUNICATOR_NUM_THREADS = LLAP_DAEMON_PREFIX + "communicator.num.threads";
- public static final int LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT = 10;
-
- /**
- * Minimum time after which a previously disabled node will be re-enabled for scheduling. This may
- * be modified by an exponential back-off if failures persist
- */
- public static final String LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS =
- LLAP_PREFIX + "task.scheduler.node.re-enable.min.timeout.ms";
- public static final long LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS_DEFAULT = 200l;
-
- /**
- * Maximum time after which a previously disabled node will be re-enabled for scheduling. This may
- * be modified by an exponential back-off if failures persist
- */
- public static final String LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS =
- LLAP_PREFIX + "task.scheduler.node.re-enable.max.timeout.ms";
- public static final long LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS_DEFAULT = 10000l;
-
- /**
- * Backoff factor on successive blacklists of a node. Blacklists timeouts start at the min timeout
- * and go up to the max timeout based on this backoff factor
- */
- public static final String LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR =
- LLAP_PREFIX + "task.scheduler.node.disable.backoff.factor";
- public static final float LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR_DEFAULT = 1.5f;
-
- /**
- * The number of tasks the AM TaskScheduler will try allocating per node.
- * 0 indicates that this should be picked up from the Registry.
- * -1 indicates unlimited capacity
- * >0 indicates a specific bound
- */
- public static final String LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE =
- LLAP_PREFIX + "task.scheduler.num.schedulable.tasks.per.node";
- public static final int LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE_DEFAULT = 0;
-
- public static final String LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE =
- LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.size";
- public static final int LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT = 10;
-
- public static final String LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME =
- LLAP_DAEMON_PREFIX + "wait.queue.comparator.class.name";
- public static final String LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME_DEFAULT =
- "org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator";
-
- public static final String LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION =
- LLAP_DAEMON_PREFIX + "task.scheduler.enable.preemption";
- public static final boolean LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION_DEFAULT = true;
-
-
- /** Amount of time to wait on a connection failure to an LLAP daemon */
- public static final String LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS =
- LLAP_PREFIX + "task.communicator.connection.timeout-millis";
- public static final long LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS_DEFAULT = 16000;
-
- /** Sleep duration while waiting for a connection failure */
- public static final String LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS =
- LLAP_PREFIX + "task.communicator.connection.sleep-between-retries-millis";
- public static final long LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT = 2000l;
-
-
-
- public static final String LLAP_DAEMON_SERVICE_PORT = LLAP_DAEMON_PREFIX + "service.port";
- public static final int LLAP_DAEMON_SERVICE_PORT_DEFAULT = 15002;
-
- public static final String LLAP_DAEMON_SERVICE_SSL = LLAP_DAEMON_PREFIX + "service.ssl";
- public static final boolean LLAP_DAEMON_SERVICE_SSL_DEFAULT = false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index 34e0682..a085427 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -24,6 +24,8 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
@@ -53,12 +55,8 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
public LlapFixedRegistryImpl(String hosts, Configuration conf) {
this.hosts = hosts.split(",");
- this.port =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
- LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
- this.shuffle =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
- LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
+ this.port = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_RPC_PORT);
+ this.shuffle = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, true);
for (Map.Entry<String, String> kv : conf) {
@@ -70,12 +68,8 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
}
}
- this.memory =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
- LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
- this.vcores =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
- LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
+ this.memory = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB);
+ this.vcores = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index a8e1465..740f373 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -16,6 +16,8 @@ package org.apache.hadoop.hive.llap.registry.impl;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
@@ -37,7 +39,7 @@ public class LlapRegistryService extends AbstractService {
@Override
public void serviceInit(Configuration conf) {
- String hosts = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
+ String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
if (hosts.startsWith("@")) {
registry = new LlapYarnRegistryImpl(hosts.substring(1), conf, isDaemon);
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
index d474b6f..2673ad7 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
@@ -32,6 +32,8 @@ import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
@@ -97,37 +99,28 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
encoder = new RegistryUtils.ServiceRecordMarshal();
this.path = RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(),
SERVICE_CLASS, instanceName, "workers"), "worker-");
- refreshDelay =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL,
- LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT);
+ refreshDelay = HiveConf.getTimeVar(
+ conf, ConfVars.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL, TimeUnit.SECONDS);
this.isDaemon = isDaemon;
Preconditions.checkArgument(refreshDelay > 0,
"Refresh delay for registry has to be positive = %d", refreshDelay);
}
public Endpoint getRpcEndpoint() {
- final int rpcPort =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
- LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
+ final int rpcPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_RPC_PORT);
return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort));
}
public Endpoint getShuffleEndpoint() {
- final int shufflePort =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
- LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
+ final int shufflePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
// HTTP today, but might not be
return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname,
shufflePort);
}
public Endpoint getServicesEndpoint() {
- final int servicePort =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_PORT,
- LlapConfiguration.LLAP_DAEMON_SERVICE_PORT_DEFAULT);
- final boolean isSSL =
- conf.getBoolean(LlapConfiguration.LLAP_DAEMON_SERVICE_SSL,
- LlapConfiguration.LLAP_DAEMON_SERVICE_SSL_DEFAULT);
+ final int servicePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT);
+ final boolean isSSL = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL);
final String scheme = isSSL ? "https" : "http";
final URL serviceURL;
try {
@@ -238,8 +231,8 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
@Override
public Resource getResource() {
- int memory = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
- int vCores = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS));
+ int memory = Integer.valueOf(srv.get(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname));
+ int vCores = Integer.valueOf(srv.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname));
return Resource.newInstance(memory, vCores);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index ac7e20c..08d573b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.CompressionUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.cli.LlapOptionsProcessor.LlapOptions;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -128,7 +128,7 @@ public class LlapServiceDriver {
// as read by the AM
// if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch between
// instances
- conf.set(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, "@" + options.getName());
+ conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + options.getName());
}
if (options.getSize() != -1) {
@@ -151,11 +151,11 @@ public class LlapServiceDriver {
final long containerSize = options.getSize() / (1024 * 1024);
Preconditions.checkArgument(containerSize >= minAlloc,
"Container size should be greater than minimum allocation(%s)", minAlloc + "m");
- conf.setLong(LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB, containerSize);
+ conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
}
if (options.getExecutors() != -1) {
- conf.setLong(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, options.getExecutors());
+ conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, options.getExecutors());
// TODO: vcpu settings - possibly when DRFA works right
}
@@ -167,7 +167,7 @@ public class LlapServiceDriver {
// Needs more explanation here
// Xmx is not the max heap value in JDK8
// You need to subtract 50% of the survivor fraction from this, to get actual usable memory before it goes into GC
- conf.setLong(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, (long)(options.getXmx())
+ conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, (long)(options.getXmx())
/ (1024 * 1024));
}
@@ -236,9 +236,8 @@ public class LlapServiceDriver {
// extract configs for processing by the python fragments in Slider
JSONObject configs = new JSONObject();
- configs.put(LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB, conf.getInt(
- LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB,
- LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB_DEFAULT));
+ configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, HiveConf.getIntVar(conf,
+ ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB));
configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname,
HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE));
@@ -246,17 +245,14 @@ public class LlapServiceDriver {
configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT.varname,
HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT));
- configs.put(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, conf.getInt(
- LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
- LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT));
+ configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, HiveConf.getIntVar(conf,
+ ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
- configs.put(LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE, conf.getInt(
- LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE,
- LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT));
+ configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname, HiveConf.getIntVar(conf,
+ ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE));
- configs.put(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, conf.getInt(
- LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
- LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT));
+ configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, HiveConf.getIntVar(conf,
+ ConfVars.LLAP_DAEMON_NUM_EXECUTORS));
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1));
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 6d54fd4..f6711d8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -39,6 +39,8 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
@@ -113,16 +115,14 @@ public class AMReporter extends AbstractService {
ExecutorService rawExecutor2 = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporterQueueDrainer").build());
this.queueLookupExecutor = MoreExecutors.listeningDecorator(rawExecutor2);
- this.heartbeatInterval =
- conf.getLong(LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS,
- LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT);
-
- this.retryTimeout =
- conf.getLong(LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS,
- LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS_DEFAULT);
- long retrySleep = conf.getLong(
- LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS,
- LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT);
+ this.heartbeatInterval = HiveConf.getTimeVar(
+ conf, ConfVars.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
+
+ this.retryTimeout = HiveConf.getTimeVar(
+ conf, ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ long retrySleep = HiveConf.getTimeVar(
+ conf, ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
+ TimeUnit.MILLISECONDS);
this.retryPolicy = RetryPolicies
.retryUpToMaximumTimeWithFixedSleep(retryTimeout, retrySleep,
TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 4b28b53..2139bb0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -98,9 +99,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
this.queryTracker = new QueryTracker(conf, localDirsBase);
addIfService(queryTracker);
- String waitQueueSchedulerClassName =
- conf.get(LlapConfiguration.LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME,
- LlapConfiguration.LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME_DEFAULT);
+ String waitQueueSchedulerClassName = HiveConf.getVar(
+ conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME);
this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, waitQueueSchedulerClassName,
enablePreemption);
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 98b1ccd..dbdf571 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -28,6 +28,7 @@ import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
@@ -104,12 +105,10 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
this.numExecutors = numExecutors;
this.localDirs = localDirs;
- int waitQueueSize = daemonConf.getInt(
- LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE,
- LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT);
- boolean enablePreemption = daemonConf.getBoolean(
- LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION,
- LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION_DEFAULT);
+ int waitQueueSize = HiveConf.getIntVar(
+ daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE);
+ boolean enablePreemption = HiveConf.getBoolVar(
+ daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION);
LOG.info("Attempting to start LlapDaemonConf with the following configuration: " +
"numExecutors=" + numExecutors +
", rpcListenerPort=" + rpcPort +
@@ -135,13 +134,11 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
this.shuffleHandlerConf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, shufflePort);
this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS,
StringUtils.arrayToString(localDirs));
- this.shuffleHandlerConf.setBoolean(ShuffleHandler.SHUFFLE_DIR_WATCHER_ENABLED, daemonConf
- .getBoolean(LlapConfiguration.LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED,
- LlapConfiguration.LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT));
+ this.shuffleHandlerConf.setBoolean(ShuffleHandler.SHUFFLE_DIR_WATCHER_ENABLED,
+ HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED));
// Less frequently set parameter, not passing in as a param.
- int numHandlers = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS,
- LlapConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT);
+ int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
// Initialize the metrics system
LlapMetricsSystem.initialize("LlapDaemon");
@@ -275,18 +272,14 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
// Cache settings will need to be setup in llap-daemon-site.xml - since the daemons don't read hive-site.xml
// Ideally, these properties should be part of LlapDameonConf rather than HiveConf
LlapConfiguration daemonConf = new LlapConfiguration();
- int numExecutors = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
- LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
+ int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
- String[] localDirs =
- daemonConf.getTrimmedStrings(LlapConfiguration.LLAP_DAEMON_WORK_DIRS);
- int rpcPort = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
- LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
+ String[] localDirs = daemonConf.getTrimmedStrings(ConfVars.LLAP_DAEMON_WORK_DIRS.varname);
+ int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT);
int shufflePort = daemonConf
.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
- long executorMemoryBytes = daemonConf
- .getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
- LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT) * 1024l * 1024l;
+ long executorMemoryBytes = HiveConf.getIntVar(
+ daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
long cacheMemoryBytes =
HiveConf.getLongVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
boolean isDirectCache =
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
index bc18a77..def1f9b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
@@ -30,6 +30,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.service.AbstractService;
import org.apache.tez.common.CallableWithNdc;
@@ -46,8 +48,8 @@ public class QueryFileCleaner extends AbstractService {
public QueryFileCleaner(Configuration conf, FileSystem localFs) {
super(QueryFileCleaner.class.getName());
- int numCleanerThreads = conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS,
- LlapConfiguration.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS_DEFAULT);
+ int numCleanerThreads = HiveConf.getIntVar(
+ conf, ConfVars.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS);
ScheduledExecutorService rawExecutor = Executors.newScheduledThreadPool(numCleanerThreads,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryFileCleaner %d").build());
this.executorService = MoreExecutors.listeningDecorator(rawExecutor);
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 2db2833..33d5671 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -16,6 +16,8 @@ package org.apache.hadoop.hive.llap.daemon.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
@@ -31,6 +33,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
@@ -83,8 +86,8 @@ public class QueryTracker extends CompositeService {
throw new RuntimeException("Failed to setup local filesystem instance", e);
}
- this.defaultDeleteDelaySeconds = conf.getLong(LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS,
- LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT);
+ this.defaultDeleteDelaySeconds = HiveConf.getTimeVar(
+ conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
queryFileCleaner = new QueryFileCleaner(conf, localFs);
addService(queryFileCleaner);
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index 37910be..7856663 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -18,7 +18,8 @@
package org.apache.hadoop.hive.llap.daemon.services.impl;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.webapp.WebApp;
@@ -43,10 +44,8 @@ public class LlapWebServices extends AbstractService {
this.conf = new Configuration(conf);
this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
- this.port = conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_PORT,
- LlapConfiguration.LLAP_DAEMON_SERVICE_PORT_DEFAULT);
- this.ssl = conf.getBoolean(LlapConfiguration.LLAP_DAEMON_SERVICE_SSL,
- LlapConfiguration.LLAP_DAEMON_SERVICE_SSL_DEFAULT);
+ this.port = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT);
+ this.ssl = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL);
this.webAppInstance = new LlapWebApp();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index d327fc0..b93650d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -22,6 +22,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.BiMap;
@@ -29,6 +30,8 @@ import com.google.common.collect.HashBiMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
@@ -113,11 +116,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
public void initialize() throws Exception {
super.initialize();
Configuration conf = getConf();
- int numThreads = conf.getInt(LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS,
- LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT);
+ int numThreads = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS);
this.communicator = new TaskCommunicator(numThreads, conf);
- this.deleteDelayOnDagComplete = conf.getLong(LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS,
- LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT);
+ this.deleteDelayOnDagComplete = HiveConf.getTimeVar(
+ conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
LOG.info("Running LlapTaskCommunicator with "
+ "fileCleanupDelay=" + deleteDelayOnDagComplete
+ ", numCommunicatorThreads=" + numThreads);
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
index 33e998c..8144165 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
@@ -41,6 +41,8 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Message;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
@@ -79,14 +81,13 @@ public class TaskCommunicator extends AbstractService {
this.hostProxies = new ConcurrentHashMap<>();
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
- long connectionTimeout =
- conf.getLong(LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS,
- LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS_DEFAULT);
- long retrySleep = conf.getLong(
- LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS,
- LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT);
- this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(connectionTimeout, retrySleep,
+ long connectionTimeout = HiveConf.getTimeVar(conf,
+ ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ long retrySleep = HiveConf.getTimeVar(conf,
+ ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
TimeUnit.MILLISECONDS);
+ this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+ connectionTimeout, retrySleep, TimeUnit.MILLISECONDS);
this.requestManager = new RequestManager(numThreads);
ExecutorService localExecutor = Executors.newFixedThreadPool(1,
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index e920f86..9821117 100644
--- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -50,6 +50,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
@@ -170,34 +172,26 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
taskSchedulerContext.getCustomClusterIdentifier());
- this.memoryPerInstance =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
- LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
- this.coresPerInstance =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE,
- LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT);
- this.executorsPerInstance =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
- LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
+ this.memoryPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB);
+ this.coresPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE);
+ this.executorsPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
this.nodeBlacklistConf = new NodeBlacklistConf(
- conf.getLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS,
- LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS_DEFAULT),
- conf.getLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS,
- LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS_DEFAULT),
- conf.getFloat(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR,
- LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR_DEFAULT));
+ HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS,
+ TimeUnit.MILLISECONDS),
+ HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MS,
+ TimeUnit.MILLISECONDS),
+ HiveConf.getFloatVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR));
- this.numSchedulableTasksPerNode = conf.getInt(
- LlapConfiguration.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE,
- LlapConfiguration.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE_DEFAULT);
+ this.numSchedulableTasksPerNode =
+ HiveConf.getIntVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE);
int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance);
int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance);
this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor);
- String instanceId = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
+ String instanceId = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
- Preconditions.checkNotNull(instanceId, LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS
+ Preconditions.checkNotNull(instanceId, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname
+ " must be defined");
ExecutorService executorServiceRaw =
@@ -999,7 +993,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
if (numSchedulableTasksConf == 0) {
int pendingQueueuCapacity = 0;
String pendingQueueCapacityString = serviceInstance.getProperties()
- .get(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE);
+ .get(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname);
if (LOG.isDebugEnabled()) {
LOG.debug("Setting up node: " + serviceInstance + ", with available capacity=" +
serviceInstance.getResource().getVirtualCores() + ", pendingQueueCapacity=" +
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index 4525ab9..52ba360 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
import org.apache.hadoop.service.AbstractService;
@@ -150,16 +151,16 @@ public class MiniLlapCluster extends AbstractService {
public void serviceStart() {
llapDaemon.start();
- clusterSpecificConfiguration.set(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS,
+ clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname,
getServiceAddress().getHostName());
- clusterSpecificConfiguration.setInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
+ clusterSpecificConfiguration.setInt(ConfVars.LLAP_DAEMON_RPC_PORT.varname,
getServiceAddress().getPort());
clusterSpecificConfiguration.setInt(
- LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
+ ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
numExecutorsPerService);
clusterSpecificConfiguration.setLong(
- LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, execBytesPerService);
+ ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, execBytesPerService);
// Optimize local fetch does not work with LLAP due to different local directories
// used by containers and LLAP
clusterSpecificConfiguration
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
index 8d45c95..bf8a673 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
@@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
@@ -33,10 +35,8 @@ public class TestLlapDaemonProtocolServerImpl {
@Test(timeout = 10000)
public void test() throws ServiceException {
LlapConfiguration daemonConf = new LlapConfiguration();
- int rpcPort = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
- LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
- int numHandlers = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS,
- LlapConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT);
+ int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT);
+ int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
LlapDaemonProtocolServerImpl server =
new LlapDaemonProtocolServerImpl(numHandlers, mock(ContainerRunner.class),
new AtomicReference<InetSocketAddress>(), rpcPort);
http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
index 23724a4..4eccc06 100644
--- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -323,17 +324,17 @@ public class TestLlapTaskSchedulerService {
TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) throws IOException,
InterruptedException {
this(disableTimeoutMillis, new String[]{HOST1, HOST2, HOST3}, 4,
- LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT);
+ ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.defaultIntVal);
}
TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) throws
IOException, InterruptedException {
conf = new Configuration();
- conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, hosts);
- conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, numExecutors);
- conf.setInt(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE, waitQueueSize);
- conf.setLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS,
- disableTimeoutMillis);
+ conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts);
+ conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors);
+ conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, waitQueueSize);
+ conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname,
+ disableTimeoutMillis + "ms");
conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false);
doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId();
[2/2] hive git commit: HIVE-11358 : LLAP: move LlapConfiguration into
HiveConf and document the settings (Sergey Shelukhin,
reviewed by Lefty Leverenz, Siddharth Seth)
Posted by se...@apache.org.
HIVE-11358 : LLAP: move LlapConfiguration into HiveConf and document the settings (Sergey Shelukhin, reviewed by Lefty Leverenz, Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f40c1502
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f40c1502
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f40c1502
Branch: refs/heads/branch-2.0
Commit: f40c1502d5bf2d466efb55c644c36ff62435bc6b
Parents: 2a6ebb8
Author: Sergey Shelukhin <se...@apache.org>
Authored: Sat Dec 5 12:59:40 2015 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Sat Dec 5 13:06:18 2015 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 163 ++++++++++++++++++-
data/conf/llap/llap-daemon-site.xml | 9 +-
.../llap/configuration/LlapConfiguration.java | 139 +---------------
.../registry/impl/LlapFixedRegistryImpl.java | 18 +-
.../llap/registry/impl/LlapRegistryService.java | 4 +-
.../registry/impl/LlapYarnRegistryImpl.java | 27 ++-
.../hadoop/hive/llap/cli/LlapServiceDriver.java | 30 ++--
.../hive/llap/daemon/impl/AMReporter.java | 20 +--
.../llap/daemon/impl/ContainerRunnerImpl.java | 6 +-
.../hive/llap/daemon/impl/LlapDaemon.java | 33 ++--
.../hive/llap/daemon/impl/QueryFileCleaner.java | 6 +-
.../hive/llap/daemon/impl/QueryTracker.java | 7 +-
.../daemon/services/impl/LlapWebServices.java | 9 +-
.../llap/tezplugins/LlapTaskCommunicator.java | 10 +-
.../hive/llap/tezplugins/TaskCommunicator.java | 15 +-
.../dag/app/rm/LlapTaskSchedulerService.java | 36 ++--
.../hive/llap/daemon/MiniLlapCluster.java | 9 +-
.../impl/TestLlapDaemonProtocolServerImpl.java | 8 +-
.../app/rm/TestLlapTaskSchedulerService.java | 13 +-
19 files changed, 280 insertions(+), 282 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 4d881ba..323a9c2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2366,6 +2366,112 @@ public class HiveConf extends Configuration {
LLAP_IO_THREADPOOL_SIZE("hive.llap.io.threadpool.size", 10,
"Specify the number of threads to use for low-level IO thread pool."),
+ LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5,
+ "Number of RPC handlers for LLAP daemon.", "llap.daemon.rpc.num.handlers"),
+ LLAP_DAEMON_WORK_DIRS("hive.llap.daemon.work.dirs", "",
+ "Working directories for the daemon. Needs to be set for a secure cluster, since LLAP may\n" +
+ "not have access to the default YARN working directories.", "llap.daemon.work.dirs"),
+ LLAP_DAEMON_YARN_SHUFFLE_PORT("hive.llap.daemon.yarn.shuffle.port", 15551,
+ "YARN shuffle port for LLAP-daemon-hosted shuffle.", "llap.daemon.yarn.shuffle.port"),
+ LLAP_DAEMON_YARN_CONTAINER_MB("hive.llap.daemon.yarn.container.mb", -1,
+ "TODO doc. Unused?", "llap.daemon.yarn.container.mb"),
+ LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED("hive.llap.daemon.shuffle.dir.watcher.enabled", false,
+ "TODO doc", "llap.daemon.shuffle.dir-watcher.enabled"),
+ LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS(
+ "hive.llap.daemon.am.liveness.heartbeat.interval.ms", "10000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Tez AM-LLAP heartbeat interval (milliseconds). This needs to be below the task timeout\n" +
+ "interval, but otherwise as high as possible to avoid unnecessary traffic.",
+ "llap.daemon.am.liveness.heartbeat.interval-ms"),
+ LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS(
+ "hive.llap.am.liveness.connection.timeout.ms", "10000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Amount of time to wait on connection failures to the AM from an LLAP daemon before\n" +
+ "considering the AM to be dead.", "llap.am.liveness.connection.timeout-millis"),
+ // Not used yet - since the Writable RPC engine does not support this policy.
+ LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS(
+ "hive.llap.am.liveness.connection.sleep.between.retries.ms", "2000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Sleep duration while waiting to retry connection failures to the AM from the daemon for\n" +
+ "the general keep-alive thread (milliseconds).",
+ "llap.am.liveness.connection.sleep-between-retries-millis"),
+ LLAP_DAEMON_NUM_EXECUTORS("hive.llap.daemon.num.executors", 4,
+ "Number of executors to use in LLAP daemon; essentially, the number of tasks that can be\n" +
+ "executed in parallel.", "llap.daemon.num.executors"),
+ LLAP_DAEMON_RPC_PORT("hive.llap.daemon.rpc.port", 15001, "The LLAP daemon RPC port.",
+ "llap.daemon.rpc.port"),
+ LLAP_DAEMON_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.memory.per.instance.mb", 4096,
+ "The total amount of memory to use for the executors inside LLAP (in megabytes).",
+ "llap.daemon.memory.per.instance.mb"),
+ LLAP_DAEMON_VCPUS_PER_INSTANCE("hive.llap.daemon.vcpus.per.instance", 4,
+ "The total number of vcpus to use for the executors inside LLAP.",
+ "llap.daemon.vcpus.per.instance"),
+ LLAP_DAEMON_NUM_FILE_CLEANER_THREADS("hive.llap.daemon.num.file.cleaner.threads", 1,
+ "Number of file cleaner threads in LLAP.", "llap.daemon.num.file.cleaner.threads"),
+ LLAP_FILE_CLEANUP_DELAY_SECONDS("hive.llap.file.cleanup.delay.seconds", "300s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "How long to delay before cleaning up query files in LLAP (in seconds, for debugging).",
+ "llap.file.cleanup.delay-seconds"),
+ LLAP_DAEMON_SERVICE_HOSTS("hive.llap.daemon.service.hosts", "",
+ "Explicitly specified hosts to use for LLAP scheduling. Useful for testing. By default,\n" +
+ "YARN registry is used.", "llap.daemon.service.hosts"),
+ LLAP_DAEMON_SERVICE_REFRESH_INTERVAL("hive.llap.daemon.service.refresh.interval.sec", "60s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "LLAP YARN registry service list refresh delay, in seconds.",
+ "llap.daemon.service.refresh.interval"),
+ LLAP_DAEMON_COMMUNICATOR_NUM_THREADS("hive.llap.daemon.communicator.num.threads", 10,
+ "Number of threads to use in LLAP task communicator in Tez AM.",
+ "llap.daemon.communicator.num.threads"),
+ LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS(
+ "hive.llap.task.scheduler.node.reenable.min.timeout.ms", "200ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Minimum time after which a previously disabled node will be re-enabled for scheduling,\n" +
+ "in milliseconds. This may be modified by an exponential back-off if failures persist.",
+ "llap.task.scheduler.node.re-enable.min.timeout.ms"),
+ LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MS(
+ "hive.llap.task.scheduler.node.reenable.max.timeout.ms", "10000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Maximum time after which a previously disabled node will be re-enabled for scheduling,\n" +
+ "in milliseconds. This may be modified by an exponential back-off if failures persist.",
+ "llap.task.scheduler.node.re-enable.max.timeout.ms"),
+ LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR(
+ "hive.llap.task.scheduler.node.disable.backoff.factor", 1.5f,
+ "Backoff factor on successive blacklists of a node due to some failures. Blacklist times\n" +
+ "start at the min timeout and go up to the max timeout based on this backoff factor.",
+ "llap.task.scheduler.node.disable.backoff.factor"),
+ LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE(
+ "hive.llap.task.scheduler.num.schedulable.tasks.per.node", 0,
+ "The number of tasks the AM TaskScheduler will try allocating per node. 0 indicates that\n" +
+ "this should be picked up from the Registry. -1 indicates unlimited capacity; positive\n" +
+ "values indicate a specific bound.", "llap.task.scheduler.num.schedulable.tasks.per.node"),
+ LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE("hive.llap.daemon.task.scheduler.wait.queue.size",
+ 10, "LLAP scheduler maximum queue size.", "llap.daemon.task.scheduler.wait.queue.size"),
+ LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME(
+ "hive.llap.daemon.wait.queue.comparator.class.name",
+ "org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator",
+ "The priority comparator to use for LLAP scheduler prioroty queue. The built-in options\n" +
+ "are org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator and\n" +
+ ".....FirstInFirstOutComparator", "llap.daemon.wait.queue.comparator.class.name"),
+ LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION(
+ "hive.llap.daemon.task.scheduler.enable.preemption", true,
+ "Whether non-finishable running tasks (e.g. a reducer waiting for inputs) should be\n" +
+ "preempted by finishable tasks inside LLAP scheduler.",
+ "llap.daemon.task.scheduler.enable.preemption"),
+ LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS(
+ "hive.llap.task.communicator.connection.timeout.ms", "16000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Connection timeout (in milliseconds) before a failure to an LLAP daemon from Tez AM.",
+ "llap.task.communicator.connection.timeout-millis"),
+ LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS(
+ "hive.llap.task.communicator.connection.sleep.between.retries.ms", "2000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Sleep duration (in milliseconds) to wait before retrying on error when obtaining a\n" +
+ "connection to LLAP daemon from Tez AM.",
+ "llap.task.communicator.connection.sleep-between-retries-millis"),
+ LLAP_DAEMON_WEB_PORT("hive.llap.daemon.web.port", 15002, "LLAP daemon web UI port.",
+ "llap.daemon.service.port"),
+ LLAP_DAEMON_WEB_SSL("hive.llap.daemon.web.ssl", false,
+ "Whether LLAP daemon web UI should use SSL.", "llap.daemon.service.ssl"),
SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
"60s", new TimeValidator(TimeUnit.SECONDS),
@@ -2418,6 +2524,7 @@ public class HiveConf extends Configuration {
public final String varname;
+ private final String altName;
private final String defaultExpr;
public final String defaultStrVal;
@@ -2437,28 +2544,39 @@ public class HiveConf extends Configuration {
private final boolean caseSensitive;
ConfVars(String varname, Object defaultVal, String description) {
- this(varname, defaultVal, null, description, true, false);
+ this(varname, defaultVal, null, description, true, false, null);
+ }
+
+ ConfVars(String varname, Object defaultVal, String description, String altName) {
+ this(varname, defaultVal, null, description, true, false, altName);
+ }
+
+ ConfVars(String varname, Object defaultVal, Validator validator, String description,
+ String altName) {
+ this(varname, defaultVal, validator, description, true, false, altName);
}
ConfVars(String varname, Object defaultVal, String description, boolean excluded) {
- this(varname, defaultVal, null, description, true, excluded);
+ this(varname, defaultVal, null, description, true, excluded, null);
}
ConfVars(String varname, String defaultVal, boolean caseSensitive, String description) {
- this(varname, defaultVal, null, description, caseSensitive, false);
+ this(varname, defaultVal, null, description, caseSensitive, false, null);
}
ConfVars(String varname, Object defaultVal, Validator validator, String description) {
- this(varname, defaultVal, validator, description, true, false);
+ this(varname, defaultVal, validator, description, true, false, null);
}
- ConfVars(String varname, Object defaultVal, Validator validator, String description, boolean caseSensitive, boolean excluded) {
+ ConfVars(String varname, Object defaultVal, Validator validator, String description,
+ boolean caseSensitive, boolean excluded, String altName) {
this.varname = varname;
this.validator = validator;
this.description = description;
this.defaultExpr = defaultVal == null ? null : String.valueOf(defaultVal);
this.excluded = excluded;
this.caseSensitive = caseSensitive;
+ this.altName = altName;
if (defaultVal == null || defaultVal instanceof String) {
this.valClass = String.class;
this.valType = VarType.STRING;
@@ -2700,6 +2818,9 @@ public class HiveConf extends Configuration {
public static int getIntVar(Configuration conf, ConfVars var) {
assert (var.valClass == Integer.class) : var.varname;
+ if (var.altName != null) {
+ return conf.getInt(var.varname, conf.getInt(var.altName, var.defaultIntVal));
+ }
return conf.getInt(var.varname, var.defaultIntVal);
}
@@ -2794,10 +2915,16 @@ public class HiveConf extends Configuration {
public static long getLongVar(Configuration conf, ConfVars var) {
assert (var.valClass == Long.class) : var.varname;
+ if (var.altName != null) {
+ return conf.getLong(var.varname, conf.getLong(var.altName, var.defaultLongVal));
+ }
return conf.getLong(var.varname, var.defaultLongVal);
}
public static long getLongVar(Configuration conf, ConfVars var, long defaultVal) {
+ if (var.altName != null) {
+ return conf.getLong(var.varname, conf.getLong(var.altName, defaultVal));
+ }
return conf.getLong(var.varname, defaultVal);
}
@@ -2816,10 +2943,16 @@ public class HiveConf extends Configuration {
public static float getFloatVar(Configuration conf, ConfVars var) {
assert (var.valClass == Float.class) : var.varname;
+ if (var.altName != null) {
+ return conf.getFloat(var.varname, conf.getFloat(var.altName, var.defaultFloatVal));
+ }
return conf.getFloat(var.varname, var.defaultFloatVal);
}
public static float getFloatVar(Configuration conf, ConfVars var, float defaultVal) {
+ if (var.altName != null) {
+ return conf.getFloat(var.varname, conf.getFloat(var.altName, defaultVal));
+ }
return conf.getFloat(var.varname, defaultVal);
}
@@ -2838,10 +2971,16 @@ public class HiveConf extends Configuration {
public static boolean getBoolVar(Configuration conf, ConfVars var) {
assert (var.valClass == Boolean.class) : var.varname;
+ if (var.altName != null) {
+ return conf.getBoolean(var.varname, conf.getBoolean(var.altName, var.defaultBoolVal));
+ }
return conf.getBoolean(var.varname, var.defaultBoolVal);
}
public static boolean getBoolVar(Configuration conf, ConfVars var, boolean defaultVal) {
+ if (var.altName != null) {
+ return conf.getBoolean(var.varname, conf.getBoolean(var.altName, defaultVal));
+ }
return conf.getBoolean(var.varname, defaultVal);
}
@@ -2860,10 +2999,24 @@ public class HiveConf extends Configuration {
public static String getVar(Configuration conf, ConfVars var) {
assert (var.valClass == String.class) : var.varname;
+ if (var.altName != null) {
+ return conf.get(var.varname, conf.get(var.altName, var.defaultStrVal));
+ }
return conf.get(var.varname, var.defaultStrVal);
}
+ public static String getTrimmedVar(Configuration conf, ConfVars var) {
+ assert (var.valClass == String.class) : var.varname;
+ if (var.altName != null) {
+ return conf.getTrimmed(var.varname, conf.getTrimmed(var.altName, var.defaultStrVal));
+ }
+ return conf.getTrimmed(var.varname, var.defaultStrVal);
+ }
+
public static String getVar(Configuration conf, ConfVars var, String defaultVal) {
+ if (var.altName != null) {
+ return conf.get(var.varname, conf.get(var.altName, defaultVal));
+ }
return conf.get(var.varname, defaultVal);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/data/conf/llap/llap-daemon-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/llap/llap-daemon-site.xml b/data/conf/llap/llap-daemon-site.xml
index f2851a7..cc3e438 100644
--- a/data/conf/llap/llap-daemon-site.xml
+++ b/data/conf/llap/llap-daemon-site.xml
@@ -16,25 +16,24 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-
<configuration>
<property>
- <name>llap.daemon.service.hosts</name>
+ <name>hive.llap.daemon.service.hosts</name>
<value>localhost</value>
</property>
<property>
- <name>llap.daemon.service.port</name>
+ <name>hive.llap.daemon.service.port</name>
<value>0</value>
</property>
<property>
- <name>llap.daemon.num.executors</name>
+ <name>hive.llap.daemon.num.executors</name>
<value>4</value>
</property>
<property>
- <name>llap.daemon.task.scheduler.wait.queue.size</name>
+ <name>hive.llap.daemon.task.scheduler.wait.queue.size</name>
<value>4</value>
</property>
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
index bd09024..abdbc09 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
@@ -19,6 +19,8 @@ import java.net.URL;
import org.apache.hadoop.conf.Configuration;
public class LlapConfiguration extends Configuration {
+ public static final String LLAP_PREFIX = "llap.";
+ public static final String LLAP_DAEMON_PREFIX = "llap.daemon.";
public LlapConfiguration(Configuration conf) {
super(conf);
@@ -35,142 +37,5 @@ public class LlapConfiguration extends Configuration {
addResource(llapDaemonConfLocation);
}
- public static final String LLAP_PREFIX = "llap.";
-
- public static final String LLAP_DAEMON_PREFIX = "llap.daemon.";
private static final String LLAP_DAEMON_SITE = "llap-daemon-site.xml";
-
-
-
- public static final String LLAP_DAEMON_RPC_NUM_HANDLERS = LLAP_DAEMON_PREFIX + "rpc.num.handlers";
- public static final int LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT = 5;
-
- public static final String LLAP_DAEMON_WORK_DIRS = LLAP_DAEMON_PREFIX + "work.dirs";
-
- public static final String LLAP_DAEMON_YARN_SHUFFLE_PORT = LLAP_DAEMON_PREFIX + "yarn.shuffle.port";
- public static final int LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT = 15551;
-
- public static final String LLAP_DAEMON_YARN_CONTAINER_MB = LLAP_DAEMON_PREFIX + "yarn.container.mb";
- public static final int LLAP_DAEMON_YARN_CONTAINER_MB_DEFAULT = -1;
-
- public static final String LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED = LLAP_DAEMON_PREFIX + "shuffle.dir-watcher.enabled";
- public static final boolean LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT = false;
-
- // This needs to be kept below the task timeout interval, but otherwise as high as possible to avoid unnecessary traffic.
- public static final String LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS = LLAP_DAEMON_PREFIX + "am.liveness.heartbeat.interval-ms";
- public static final long LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT = 10000l;
-
- /**
- * Amount of time to wait on connection failures to the AM from an LLAP daemon before considering
- * the AM to be dead
- */
- public static final String LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS =
- LLAP_PREFIX + "am.liveness.connection.timeout-millis";
- public static final long LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS_DEFAULT = 10000l;
-
- // Not used yet - since the Writable RPC engine does not support this policy.
- /**
- * Sleep duration while waiting to retry connection failures to the AM from the daemon for the
- * general keep-alive thread
- */
- public static final String LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS =
- LLAP_PREFIX + "am.liveness.connection.sleep-between-retries-millis";
- public static final long LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT =
- 2000l;
-
-
- // Section for configs used in AM and executors
- public static final String LLAP_DAEMON_NUM_EXECUTORS = LLAP_DAEMON_PREFIX + "num.executors";
- public static final int LLAP_DAEMON_NUM_EXECUTORS_DEFAULT = 4;
-
- public static final String LLAP_DAEMON_RPC_PORT = LLAP_DAEMON_PREFIX + "rpc.port";
- public static final int LLAP_DAEMON_RPC_PORT_DEFAULT = 15001;
-
- public static final String LLAP_DAEMON_MEMORY_PER_INSTANCE_MB = LLAP_DAEMON_PREFIX + "memory.per.instance.mb";
- public static final int LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT = 4096;
-
- public static final String LLAP_DAEMON_VCPUS_PER_INSTANCE = LLAP_DAEMON_PREFIX + "vcpus.per.instance";
- public static final int LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT = 4;
-
- public static final String LLAP_DAEMON_NUM_FILE_CLEANER_THREADS = LLAP_DAEMON_PREFIX + "num.file.cleaner.threads";
- public static final int LLAP_DAEMON_NUM_FILE_CLEANER_THREADS_DEFAULT = 1;
-
-
- // Section for configs used in the AM //
- public static final String LLAP_FILE_CLEANUP_DELAY_SECONDS = LLAP_PREFIX + "file.cleanup.delay-seconds";
- public static final long LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT = 300; // 5 minutes by default
-
- public static final String LLAP_DAEMON_SERVICE_HOSTS = LLAP_DAEMON_PREFIX + "service.hosts";
-
- public static final String LLAP_DAEMON_SERVICE_REFRESH_INTERVAL = LLAP_DAEMON_PREFIX + "service.refresh.interval";
- public static final int LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT = 60; // seconds
-
- public static final String LLAP_DAEMON_COMMUNICATOR_NUM_THREADS = LLAP_DAEMON_PREFIX + "communicator.num.threads";
- public static final int LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT = 10;
-
- /**
- * Minimum time after which a previously disabled node will be re-enabled for scheduling. This may
- * be modified by an exponential back-off if failures persist
- */
- public static final String LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS =
- LLAP_PREFIX + "task.scheduler.node.re-enable.min.timeout.ms";
- public static final long LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS_DEFAULT = 200l;
-
- /**
- * Maximum time after which a previously disabled node will be re-enabled for scheduling. This may
- * be modified by an exponential back-off if failures persist
- */
- public static final String LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS =
- LLAP_PREFIX + "task.scheduler.node.re-enable.max.timeout.ms";
- public static final long LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS_DEFAULT = 10000l;
-
- /**
- * Backoff factor on successive blacklists of a node. Blacklists timeouts start at the min timeout
- * and go up to the max timeout based on this backoff factor
- */
- public static final String LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR =
- LLAP_PREFIX + "task.scheduler.node.disable.backoff.factor";
- public static final float LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR_DEFAULT = 1.5f;
-
- /**
- * The number of tasks the AM TaskScheduler will try allocating per node.
- * 0 indicates that this should be picked up from the Registry.
- * -1 indicates unlimited capacity
- * >0 indicates a specific bound
- */
- public static final String LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE =
- LLAP_PREFIX + "task.scheduler.num.schedulable.tasks.per.node";
- public static final int LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE_DEFAULT = 0;
-
- public static final String LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE =
- LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.size";
- public static final int LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT = 10;
-
- public static final String LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME =
- LLAP_DAEMON_PREFIX + "wait.queue.comparator.class.name";
- public static final String LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME_DEFAULT =
- "org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator";
-
- public static final String LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION =
- LLAP_DAEMON_PREFIX + "task.scheduler.enable.preemption";
- public static final boolean LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION_DEFAULT = true;
-
-
- /** Amount of time to wait on a connection failure to an LLAP daemon */
- public static final String LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS =
- LLAP_PREFIX + "task.communicator.connection.timeout-millis";
- public static final long LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS_DEFAULT = 16000;
-
- /** Sleep duration while waiting for a connection failure */
- public static final String LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS =
- LLAP_PREFIX + "task.communicator.connection.sleep-between-retries-millis";
- public static final long LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT = 2000l;
-
-
-
- public static final String LLAP_DAEMON_SERVICE_PORT = LLAP_DAEMON_PREFIX + "service.port";
- public static final int LLAP_DAEMON_SERVICE_PORT_DEFAULT = 15002;
-
- public static final String LLAP_DAEMON_SERVICE_SSL = LLAP_DAEMON_PREFIX + "service.ssl";
- public static final boolean LLAP_DAEMON_SERVICE_SSL_DEFAULT = false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index 34e0682..a085427 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -24,6 +24,8 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
@@ -53,12 +55,8 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
public LlapFixedRegistryImpl(String hosts, Configuration conf) {
this.hosts = hosts.split(",");
- this.port =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
- LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
- this.shuffle =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
- LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
+ this.port = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_RPC_PORT);
+ this.shuffle = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, true);
for (Map.Entry<String, String> kv : conf) {
@@ -70,12 +68,8 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
}
}
- this.memory =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
- LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
- this.vcores =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
- LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
+ this.memory = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB);
+ this.vcores = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index a8e1465..740f373 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -16,6 +16,8 @@ package org.apache.hadoop.hive.llap.registry.impl;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
@@ -37,7 +39,7 @@ public class LlapRegistryService extends AbstractService {
@Override
public void serviceInit(Configuration conf) {
- String hosts = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
+ String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
if (hosts.startsWith("@")) {
registry = new LlapYarnRegistryImpl(hosts.substring(1), conf, isDaemon);
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
index d474b6f..2673ad7 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
@@ -32,6 +32,8 @@ import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
@@ -97,37 +99,28 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
encoder = new RegistryUtils.ServiceRecordMarshal();
this.path = RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(),
SERVICE_CLASS, instanceName, "workers"), "worker-");
- refreshDelay =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL,
- LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT);
+ refreshDelay = HiveConf.getTimeVar(
+ conf, ConfVars.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL, TimeUnit.SECONDS);
this.isDaemon = isDaemon;
Preconditions.checkArgument(refreshDelay > 0,
"Refresh delay for registry has to be positive = %d", refreshDelay);
}
public Endpoint getRpcEndpoint() {
- final int rpcPort =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
- LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
+ final int rpcPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_RPC_PORT);
return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort));
}
public Endpoint getShuffleEndpoint() {
- final int shufflePort =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
- LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
+ final int shufflePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
// HTTP today, but might not be
return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname,
shufflePort);
}
public Endpoint getServicesEndpoint() {
- final int servicePort =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_PORT,
- LlapConfiguration.LLAP_DAEMON_SERVICE_PORT_DEFAULT);
- final boolean isSSL =
- conf.getBoolean(LlapConfiguration.LLAP_DAEMON_SERVICE_SSL,
- LlapConfiguration.LLAP_DAEMON_SERVICE_SSL_DEFAULT);
+ final int servicePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT);
+ final boolean isSSL = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL);
final String scheme = isSSL ? "https" : "http";
final URL serviceURL;
try {
@@ -238,8 +231,8 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
@Override
public Resource getResource() {
- int memory = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
- int vCores = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS));
+ int memory = Integer.valueOf(srv.get(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname));
+ int vCores = Integer.valueOf(srv.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname));
return Resource.newInstance(memory, vCores);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index ac7e20c..08d573b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.CompressionUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.cli.LlapOptionsProcessor.LlapOptions;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -128,7 +128,7 @@ public class LlapServiceDriver {
// as read by the AM
// if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch between
// instances
- conf.set(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, "@" + options.getName());
+ conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + options.getName());
}
if (options.getSize() != -1) {
@@ -151,11 +151,11 @@ public class LlapServiceDriver {
final long containerSize = options.getSize() / (1024 * 1024);
Preconditions.checkArgument(containerSize >= minAlloc,
"Container size should be greater than minimum allocation(%s)", minAlloc + "m");
- conf.setLong(LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB, containerSize);
+ conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
}
if (options.getExecutors() != -1) {
- conf.setLong(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, options.getExecutors());
+ conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, options.getExecutors());
// TODO: vcpu settings - possibly when DRFA works right
}
@@ -167,7 +167,7 @@ public class LlapServiceDriver {
// Needs more explanation here
// Xmx is not the max heap value in JDK8
// You need to subtract 50% of the survivor fraction from this, to get actual usable memory before it goes into GC
- conf.setLong(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, (long)(options.getXmx())
+ conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, (long)(options.getXmx())
/ (1024 * 1024));
}
@@ -236,9 +236,8 @@ public class LlapServiceDriver {
// extract configs for processing by the python fragments in Slider
JSONObject configs = new JSONObject();
- configs.put(LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB, conf.getInt(
- LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB,
- LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB_DEFAULT));
+ configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, HiveConf.getIntVar(conf,
+ ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB));
configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname,
HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE));
@@ -246,17 +245,14 @@ public class LlapServiceDriver {
configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT.varname,
HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT));
- configs.put(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, conf.getInt(
- LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
- LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT));
+ configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, HiveConf.getIntVar(conf,
+ ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
- configs.put(LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE, conf.getInt(
- LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE,
- LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT));
+ configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname, HiveConf.getIntVar(conf,
+ ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE));
- configs.put(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, conf.getInt(
- LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
- LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT));
+ configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, HiveConf.getIntVar(conf,
+ ConfVars.LLAP_DAEMON_NUM_EXECUTORS));
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1));
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 6d54fd4..f6711d8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -39,6 +39,8 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
@@ -113,16 +115,14 @@ public class AMReporter extends AbstractService {
ExecutorService rawExecutor2 = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporterQueueDrainer").build());
this.queueLookupExecutor = MoreExecutors.listeningDecorator(rawExecutor2);
- this.heartbeatInterval =
- conf.getLong(LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS,
- LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT);
-
- this.retryTimeout =
- conf.getLong(LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS,
- LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS_DEFAULT);
- long retrySleep = conf.getLong(
- LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS,
- LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT);
+ this.heartbeatInterval = HiveConf.getTimeVar(
+ conf, ConfVars.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
+
+ this.retryTimeout = HiveConf.getTimeVar(
+ conf, ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ long retrySleep = HiveConf.getTimeVar(
+ conf, ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
+ TimeUnit.MILLISECONDS);
this.retryPolicy = RetryPolicies
.retryUpToMaximumTimeWithFixedSleep(retryTimeout, retrySleep,
TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 4b28b53..2139bb0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -98,9 +99,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
this.queryTracker = new QueryTracker(conf, localDirsBase);
addIfService(queryTracker);
- String waitQueueSchedulerClassName =
- conf.get(LlapConfiguration.LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME,
- LlapConfiguration.LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME_DEFAULT);
+ String waitQueueSchedulerClassName = HiveConf.getVar(
+ conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME);
this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, waitQueueSchedulerClassName,
enablePreemption);
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 98b1ccd..dbdf571 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -28,6 +28,7 @@ import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
@@ -104,12 +105,10 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
this.numExecutors = numExecutors;
this.localDirs = localDirs;
- int waitQueueSize = daemonConf.getInt(
- LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE,
- LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT);
- boolean enablePreemption = daemonConf.getBoolean(
- LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION,
- LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION_DEFAULT);
+ int waitQueueSize = HiveConf.getIntVar(
+ daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE);
+ boolean enablePreemption = HiveConf.getBoolVar(
+ daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION);
LOG.info("Attempting to start LlapDaemonConf with the following configuration: " +
"numExecutors=" + numExecutors +
", rpcListenerPort=" + rpcPort +
@@ -135,13 +134,11 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
this.shuffleHandlerConf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, shufflePort);
this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS,
StringUtils.arrayToString(localDirs));
- this.shuffleHandlerConf.setBoolean(ShuffleHandler.SHUFFLE_DIR_WATCHER_ENABLED, daemonConf
- .getBoolean(LlapConfiguration.LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED,
- LlapConfiguration.LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT));
+ this.shuffleHandlerConf.setBoolean(ShuffleHandler.SHUFFLE_DIR_WATCHER_ENABLED,
+ HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED));
// Less frequently set parameter, not passing in as a param.
- int numHandlers = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS,
- LlapConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT);
+ int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
// Initialize the metrics system
LlapMetricsSystem.initialize("LlapDaemon");
@@ -275,18 +272,14 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
// Cache settings will need to be setup in llap-daemon-site.xml - since the daemons don't read hive-site.xml
// Ideally, these properties should be part of LlapDameonConf rather than HiveConf
LlapConfiguration daemonConf = new LlapConfiguration();
- int numExecutors = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
- LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
+ int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
- String[] localDirs =
- daemonConf.getTrimmedStrings(LlapConfiguration.LLAP_DAEMON_WORK_DIRS);
- int rpcPort = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
- LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
+ String[] localDirs = daemonConf.getTrimmedStrings(ConfVars.LLAP_DAEMON_WORK_DIRS.varname);
+ int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT);
int shufflePort = daemonConf
.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
- long executorMemoryBytes = daemonConf
- .getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
- LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT) * 1024l * 1024l;
+ long executorMemoryBytes = HiveConf.getIntVar(
+ daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
long cacheMemoryBytes =
HiveConf.getLongVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
boolean isDirectCache =
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
index bc18a77..def1f9b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
@@ -30,6 +30,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.service.AbstractService;
import org.apache.tez.common.CallableWithNdc;
@@ -46,8 +48,8 @@ public class QueryFileCleaner extends AbstractService {
public QueryFileCleaner(Configuration conf, FileSystem localFs) {
super(QueryFileCleaner.class.getName());
- int numCleanerThreads = conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS,
- LlapConfiguration.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS_DEFAULT);
+ int numCleanerThreads = HiveConf.getIntVar(
+ conf, ConfVars.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS);
ScheduledExecutorService rawExecutor = Executors.newScheduledThreadPool(numCleanerThreads,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryFileCleaner %d").build());
this.executorService = MoreExecutors.listeningDecorator(rawExecutor);
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 2db2833..33d5671 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -16,6 +16,8 @@ package org.apache.hadoop.hive.llap.daemon.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
@@ -31,6 +33,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
@@ -83,8 +86,8 @@ public class QueryTracker extends CompositeService {
throw new RuntimeException("Failed to setup local filesystem instance", e);
}
- this.defaultDeleteDelaySeconds = conf.getLong(LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS,
- LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT);
+ this.defaultDeleteDelaySeconds = HiveConf.getTimeVar(
+ conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
queryFileCleaner = new QueryFileCleaner(conf, localFs);
addService(queryFileCleaner);
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index 37910be..7856663 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -18,7 +18,8 @@
package org.apache.hadoop.hive.llap.daemon.services.impl;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.webapp.WebApp;
@@ -43,10 +44,8 @@ public class LlapWebServices extends AbstractService {
this.conf = new Configuration(conf);
this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
- this.port = conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_PORT,
- LlapConfiguration.LLAP_DAEMON_SERVICE_PORT_DEFAULT);
- this.ssl = conf.getBoolean(LlapConfiguration.LLAP_DAEMON_SERVICE_SSL,
- LlapConfiguration.LLAP_DAEMON_SERVICE_SSL_DEFAULT);
+ this.port = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT);
+ this.ssl = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL);
this.webAppInstance = new LlapWebApp();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index d327fc0..b93650d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -22,6 +22,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.BiMap;
@@ -29,6 +30,8 @@ import com.google.common.collect.HashBiMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
@@ -113,11 +116,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
public void initialize() throws Exception {
super.initialize();
Configuration conf = getConf();
- int numThreads = conf.getInt(LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS,
- LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT);
+ int numThreads = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS);
this.communicator = new TaskCommunicator(numThreads, conf);
- this.deleteDelayOnDagComplete = conf.getLong(LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS,
- LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT);
+ this.deleteDelayOnDagComplete = HiveConf.getTimeVar(
+ conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
LOG.info("Running LlapTaskCommunicator with "
+ "fileCleanupDelay=" + deleteDelayOnDagComplete
+ ", numCommunicatorThreads=" + numThreads);
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
index 33e998c..8144165 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
@@ -41,6 +41,8 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Message;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
@@ -79,14 +81,13 @@ public class TaskCommunicator extends AbstractService {
this.hostProxies = new ConcurrentHashMap<>();
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
- long connectionTimeout =
- conf.getLong(LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS,
- LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS_DEFAULT);
- long retrySleep = conf.getLong(
- LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS,
- LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT);
- this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(connectionTimeout, retrySleep,
+ long connectionTimeout = HiveConf.getTimeVar(conf,
+ ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ long retrySleep = HiveConf.getTimeVar(conf,
+ ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
TimeUnit.MILLISECONDS);
+ this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+ connectionTimeout, retrySleep, TimeUnit.MILLISECONDS);
this.requestManager = new RequestManager(numThreads);
ExecutorService localExecutor = Executors.newFixedThreadPool(1,
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index e920f86..9821117 100644
--- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -50,6 +50,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
@@ -170,34 +172,26 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
taskSchedulerContext.getCustomClusterIdentifier());
- this.memoryPerInstance =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
- LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
- this.coresPerInstance =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE,
- LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT);
- this.executorsPerInstance =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
- LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
+ this.memoryPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB);
+ this.coresPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE);
+ this.executorsPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
this.nodeBlacklistConf = new NodeBlacklistConf(
- conf.getLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS,
- LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS_DEFAULT),
- conf.getLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS,
- LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS_DEFAULT),
- conf.getFloat(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR,
- LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR_DEFAULT));
+ HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS,
+ TimeUnit.MILLISECONDS),
+ HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MS,
+ TimeUnit.MILLISECONDS),
+ HiveConf.getFloatVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR));
- this.numSchedulableTasksPerNode = conf.getInt(
- LlapConfiguration.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE,
- LlapConfiguration.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE_DEFAULT);
+ this.numSchedulableTasksPerNode =
+ HiveConf.getIntVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE);
int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance);
int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance);
this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor);
- String instanceId = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
+ String instanceId = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
- Preconditions.checkNotNull(instanceId, LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS
+ Preconditions.checkNotNull(instanceId, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname
+ " must be defined");
ExecutorService executorServiceRaw =
@@ -999,7 +993,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
if (numSchedulableTasksConf == 0) {
int pendingQueueuCapacity = 0;
String pendingQueueCapacityString = serviceInstance.getProperties()
- .get(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE);
+ .get(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname);
if (LOG.isDebugEnabled()) {
LOG.debug("Setting up node: " + serviceInstance + ", with available capacity=" +
serviceInstance.getResource().getVirtualCores() + ", pendingQueueCapacity=" +
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index 4525ab9..52ba360 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
import org.apache.hadoop.service.AbstractService;
@@ -150,16 +151,16 @@ public class MiniLlapCluster extends AbstractService {
public void serviceStart() {
llapDaemon.start();
- clusterSpecificConfiguration.set(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS,
+ clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname,
getServiceAddress().getHostName());
- clusterSpecificConfiguration.setInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
+ clusterSpecificConfiguration.setInt(ConfVars.LLAP_DAEMON_RPC_PORT.varname,
getServiceAddress().getPort());
clusterSpecificConfiguration.setInt(
- LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
+ ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
numExecutorsPerService);
clusterSpecificConfiguration.setLong(
- LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, execBytesPerService);
+ ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, execBytesPerService);
// Optimize local fetch does not work with LLAP due to different local directories
// used by containers and LLAP
clusterSpecificConfiguration
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
index 8d45c95..bf8a673 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
@@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
@@ -33,10 +35,8 @@ public class TestLlapDaemonProtocolServerImpl {
@Test(timeout = 10000)
public void test() throws ServiceException {
LlapConfiguration daemonConf = new LlapConfiguration();
- int rpcPort = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
- LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
- int numHandlers = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS,
- LlapConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT);
+ int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT);
+ int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
LlapDaemonProtocolServerImpl server =
new LlapDaemonProtocolServerImpl(numHandlers, mock(ContainerRunner.class),
new AtomicReference<InetSocketAddress>(), rpcPort);
http://git-wip-us.apache.org/repos/asf/hive/blob/f40c1502/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
index 23724a4..4eccc06 100644
--- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -323,17 +324,17 @@ public class TestLlapTaskSchedulerService {
TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) throws IOException,
InterruptedException {
this(disableTimeoutMillis, new String[]{HOST1, HOST2, HOST3}, 4,
- LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT);
+ ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.defaultIntVal);
}
TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) throws
IOException, InterruptedException {
conf = new Configuration();
- conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, hosts);
- conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, numExecutors);
- conf.setInt(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE, waitQueueSize);
- conf.setLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS,
- disableTimeoutMillis);
+ conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts);
+ conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors);
+ conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, waitQueueSize);
+ conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname,
+ disableTimeoutMillis + "ms");
conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false);
doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId();