You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/07 20:59:21 UTC
[53/70] [abbrv] hive git commit: HIVE-15801. Some logging
improvements in LlapTaskScheduler. (Siddharth Seth,
reviewed by Sergey Shelukhin)
HIVE-15801. Some logging improvements in LlapTaskScheduler. (Siddharth Seth, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/def0cdea
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/def0cdea
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/def0cdea
Branch: refs/heads/hive-14535
Commit: def0cdeae932c6ac3c8b3a5f6f9f4e6adb268cc4
Parents: 29e671e
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Feb 6 13:24:46 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Feb 6 13:24:46 2017 -0800
----------------------------------------------------------------------
.../hive/llap/daemon/impl/LlapDaemon.java | 6 +
.../tezplugins/LlapTaskSchedulerService.java | 114 ++++++++++++-------
2 files changed, 80 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/def0cdea/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index cca6bc6..e737fdd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -380,6 +380,12 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
}
getConfig().setInt(ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT.varname, LlapOutputFormatService.get().getPort());
+ // Ensure this is set in the config so that the AM can read it.
+ getConfig()
+ .setIfUnset(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname,
+ ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE
+ .getDefaultValue());
+
this.registry.init(getConfig());
this.registry.start();
LOG.info(
http://git-wip-us.apache.org/repos/asf/hive/blob/def0cdea/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 3c0a661..dc594a2 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -134,9 +134,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
@VisibleForTesting
final DelayQueue<TaskInfo> delayedTaskQueue = new DelayQueue<>();
+ private volatile boolean dagRunning = false;
private final ContainerFactory containerFactory;
- private final Random random = new Random();
@VisibleForTesting
final Clock clock;
@@ -167,20 +167,14 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private final NodeBlacklistConf nodeBlacklistConf;
private final LocalityDelayConf localityDelayConf;
- // Per daemon
- private final int memoryPerInstance;
- private final int coresPerInstance;
- private final int executorsPerInstance;
-
private final int numSchedulableTasksPerNode;
- // Per Executor Thread
- private final Resource resourcePerExecutor;
// when there are no live nodes in the cluster and this timeout elapses the query is failed
private final long timeout;
private final Lock timeoutLock = new ReentrantLock();
private final ScheduledExecutorService timeoutExecutor;
+ private final ScheduledExecutorService scheduledLoggingExecutor;
private final SchedulerTimeoutMonitor timeoutMonitor;
private ScheduledFuture<?> timeoutFuture;
@@ -221,9 +215,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// TODO HIVE-13483 Get all of these properties from the registry. This will need to take care of different instances
// publishing potentially different values when we support changing configurations dynamically.
// For now, this can simply be fetched from a single registry instance.
- this.memoryPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB);
- this.coresPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE);
- this.executorsPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
this.nodeBlacklistConf = new NodeBlacklistConf(
HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS,
TimeUnit.MILLISECONDS),
@@ -247,9 +238,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
.build());
this.timeoutFuture = null;
- int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance);
- int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance);
- this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor);
+ this.scheduledLoggingExecutor = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapTaskSchedulerTimedLogThread")
+ .build());
String instanceId = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
@@ -280,19 +271,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
String sessionId = conf.get("llap.daemon.metrics.sessionid");
// TODO: Not sure about the use of this. Should we instead use workerIdentity as sessionId?
this.metrics = LlapTaskSchedulerMetrics.create(displayName, sessionId);
- this.metrics.setNumExecutors(executorsPerInstance);
- this.metrics.setMemoryPerInstance(memoryPerInstance * 1024L * 1024L);
- this.metrics.setCpuCoresPerInstance(coresPerExecutor);
} else {
this.metrics = null;
this.pauseMonitor = null;
}
- LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance
- + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance="
- + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor
- + ", nodeBlacklistConf=" + nodeBlacklistConf
- + ", localityDelayMs=" + localityDelayMs);
+ String hostsString = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+ LOG.info(
+ "Running with configuration: hosts={}, numSchedulableTasksPerNode={}, nodeBlacklistConf={}, localityConf={}",
+ hostsString, numSchedulableTasksPerNode, nodeBlacklistConf, localityDelayConf);
+
}
@Override
@@ -304,6 +292,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
public void start() throws IOException {
writeLock.lock();
try {
+ scheduledLoggingExecutor.schedule(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ readLock.lock();
+ try {
+ if (dagRunning) {
+ LOG.info("Stats for current dag: {}", dagStats);
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return null;
+ }
+ }, 10000L, TimeUnit.MILLISECONDS);
+
nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable);
Futures.addCallback(nodeEnablerFuture, new LoggingFutureCallback("NodeEnablerThread", LOG));
@@ -344,12 +347,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
LOG.warn(
"Not expecing Updates from the registry. Received update for instance={}. Ignoring",
serviceInstance);
-// Replacing NodeInfo means we end up discarding whatever state was known about that node.
-// instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance,
-// nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics));
-//
-//
-// LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity());
}
@Override
@@ -408,6 +405,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
writeLock.lock();
try {
if (!this.isStopped.getAndSet(true)) {
+ scheduledLoggingExecutor.shutdownNow();
+
nodeEnablerCallable.shutdown();
if (nodeEnablerFuture != null) {
nodeEnablerFuture.cancel(true);
@@ -526,7 +525,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
if (metrics != null) {
metrics.incrCompletedDagCount();
}
- dagStats = new StatsPerDag();
+ writeLock.lock();
+ try {
+ dagRunning = false;
+ dagStats = new StatsPerDag();
+ } finally {
+ writeLock.unlock();
+ }
// TODO Cleanup pending tasks etc, so that the next dag is not affected.
}
@@ -554,6 +559,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
priority, capability, Arrays.toString(hosts));
writeLock.lock();
try {
+ dagRunning = true;
dagStats.registerTaskRequest(hosts, racks);
} finally {
writeLock.unlock();
@@ -573,6 +579,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
priority, capability, containerId);
writeLock.lock();
try {
+ dagRunning = true;
dagStats.registerTaskRequest(null, null);
} finally {
writeLock.unlock();
@@ -621,7 +628,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
taskInfo.getState(), endReason);
// Re-enable the node if preempted
if (taskInfo.getState() == TaskInfo.State.PREEMPTED) {
- LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason);
unregisterPendingPreemption(taskInfo.assignedNode.getHost());
nodeInfo.registerUnsuccessfulTaskEnd(true);
if (nodeInfo.isDisabled()) {
@@ -1051,7 +1057,10 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
taskInfo.triedAssigningTask();
ScheduleResult scheduleResult = scheduleTask(taskInfo, totalResource);
- LOG.info("ScheduleResult for Task: {} = {}", taskInfo, scheduleResult);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ScheduleResult for Task: {} = {}", taskInfo,
+ scheduleResult);
+ }
if (scheduleResult == ScheduleResult.SCHEDULED) {
taskIter.remove();
} else {
@@ -1185,13 +1194,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) {
NodeInfo nodeInfo = selectHostResult.nodeInfo;
Container container =
- containerFactory.createContainer(resourcePerExecutor, taskInfo.priority,
+ containerFactory.createContainer(nodeInfo.getResourcePerExecutor(), taskInfo.priority,
nodeInfo.getHost(),
nodeInfo.getRpcPort(),
nodeInfo.getServiceAddress());
writeLock.lock(); // While updating local structures
try {
- LOG.info("Assigned task={} on node={}, to container={} on node={}",
+ LOG.info("Assigned task={} on node={}, to container={}",
taskInfo, nodeInfo.toShortString(), container.getId());
dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks,
nodeInfo.getHost());
@@ -1498,6 +1507,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private int numScheduledTasks = 0;
private final int numSchedulableTasks;
private final LlapTaskSchedulerMetrics metrics;
+ private final Resource resourcePerExecutor;
/**
* Create a NodeInfo bound to a service instance
@@ -1516,6 +1526,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
this.clock = clock;
this.metrics = metrics;
+ int numVcores = serviceInstance.getResource().getVirtualCores();
+ int memoryPerInstance = serviceInstance.getResource().getMemory();
+ int memoryPerExecutor = (int)(memoryPerInstance / (double) numVcores);
+ resourcePerExecutor = Resource.newInstance(memoryPerExecutor, 1);
+
if (numSchedulableTasksConf == 0) {
int pendingQueueuCapacity = 0;
String pendingQueueCapacityString = serviceInstance.getProperties()
@@ -1526,7 +1541,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
if (pendingQueueCapacityString != null) {
pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString);
}
- this.numSchedulableTasks = serviceInstance.getResource().getVirtualCores() + pendingQueueuCapacity;
+ this.numSchedulableTasks = numVcores + pendingQueueuCapacity;
} else {
this.numSchedulableTasks = numSchedulableTasksConf;
LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks);
@@ -1553,6 +1568,10 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return serviceInstance.getServicesAddress();
}
+ public Resource getResourcePerExecutor() {
+ return resourcePerExecutor;
+ }
+
void enableNode() {
expireTimeMillis = -1;
disabled = false;
@@ -1634,21 +1653,36 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return hadCommFailure;
}
+ int canAcceptCounter = 0;
/* Returning true does not guarantee that the task will run, considering other queries
may be running in the system. Also depends upon the capacity usage configuration
*/
boolean canAcceptTask() {
boolean result = !hadCommFailure && !disabled
&&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " +
- serviceInstance.getWorkerIdentity() + "]: " +
- "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}",
- result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(constructCanAcceptLogResult(result));
+ }
+ if (canAcceptCounter == 10000) {
+ canAcceptCounter++;
+ LOG.info(constructCanAcceptLogResult(result));
+ canAcceptCounter = 0;
}
return result;
}
+ String constructCanAcceptLogResult(boolean result) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Node[").append(serviceInstance.getHost()).append(":").append(serviceInstance.getRpcPort())
+ .append(", ").append(serviceInstance.getWorkerIdentity()).append("]: ")
+ .append("canAcceptTask=").append(result)
+ .append(", numScheduledTasks=").append(numScheduledTasks)
+ .append(", numSchedulableTasks=").append(numSchedulableTasks)
+ .append(", hadCommFailure=").append(hadCommFailure)
+ .append(", disabled=").append(disabled);
+ return sb.toString();
+ }
+
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTimeMillis - clock.getTime(), TimeUnit.MILLISECONDS);
@@ -1911,7 +1945,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
", priority=" + priority +
", startTime=" + startTime +
", containerId=" + containerId +
- ", assignedNode=" + (assignedNode == null ? "" : assignedNode.toShortString()) +
+ (assignedNode != null ? "assignedNode=" + assignedNode.toShortString() : "") +
", uniqueId=" + uniqueId +
", localityDelayTimeout=" + localityDelayTimeout +
'}';