You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2017/02/06 21:25:16 UTC

[2/2] hive git commit: HIVE-15801. Some logging improvements in LlapTaskScheduler. (Siddharth Seth, reviewed by Sergey Shelukhin)

HIVE-15801. Some logging improvements in LlapTaskScheduler. (Siddharth Seth, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/def0cdea
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/def0cdea
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/def0cdea

Branch: refs/heads/master
Commit: def0cdeae932c6ac3c8b3a5f6f9f4e6adb268cc4
Parents: 29e671e
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Feb 6 13:24:46 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Feb 6 13:24:46 2017 -0800

----------------------------------------------------------------------
 .../hive/llap/daemon/impl/LlapDaemon.java       |   6 +
 .../tezplugins/LlapTaskSchedulerService.java    | 114 ++++++++++++-------
 2 files changed, 80 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/def0cdea/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index cca6bc6..e737fdd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -380,6 +380,12 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     }
     getConfig().setInt(ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT.varname, LlapOutputFormatService.get().getPort());
 
+    // Ensure this is set in the config so that the AM can read it.
+    getConfig()
+        .setIfUnset(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname,
+            ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE
+                .getDefaultValue());
+
     this.registry.init(getConfig());
     this.registry.start();
     LOG.info(

http://git-wip-us.apache.org/repos/asf/hive/blob/def0cdea/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 3c0a661..dc594a2 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -134,9 +134,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   @VisibleForTesting
   final DelayQueue<TaskInfo> delayedTaskQueue = new DelayQueue<>();
 
+  private volatile boolean dagRunning = false;
 
   private final ContainerFactory containerFactory;
-  private final Random random = new Random();
   @VisibleForTesting
   final Clock clock;
 
@@ -167,20 +167,14 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   private final NodeBlacklistConf nodeBlacklistConf;
   private final LocalityDelayConf localityDelayConf;
 
-  // Per daemon
-  private final int memoryPerInstance;
-  private final int coresPerInstance;
-  private final int executorsPerInstance;
-
   private final int numSchedulableTasksPerNode;
 
-  // Per Executor Thread
-  private final Resource resourcePerExecutor;
 
   // when there are no live nodes in the cluster and this timeout elapses the query is failed
   private final long timeout;
   private final Lock timeoutLock = new ReentrantLock();
   private final ScheduledExecutorService timeoutExecutor;
+  private final ScheduledExecutorService scheduledLoggingExecutor;
   private final SchedulerTimeoutMonitor timeoutMonitor;
   private ScheduledFuture<?> timeoutFuture;
 
@@ -221,9 +215,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     // TODO HIVE-13483 Get all of these properties from the registry. This will need to take care of different instances
     // publishing potentially different values when we support changing configurations dynamically.
     // For now, this can simply be fetched from a single registry instance.
-    this.memoryPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB);
-    this.coresPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE);
-    this.executorsPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
     this.nodeBlacklistConf = new NodeBlacklistConf(
         HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS,
             TimeUnit.MILLISECONDS),
@@ -247,9 +238,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
             .build());
     this.timeoutFuture = null;
 
-    int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance);
-    int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance);
-    this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor);
+    this.scheduledLoggingExecutor = Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapTaskSchedulerTimedLogThread")
+            .build());
 
     String instanceId = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
 
@@ -280,19 +271,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       String sessionId = conf.get("llap.daemon.metrics.sessionid");
       // TODO: Not sure about the use of this. Should we instead use workerIdentity as sessionId?
       this.metrics = LlapTaskSchedulerMetrics.create(displayName, sessionId);
-      this.metrics.setNumExecutors(executorsPerInstance);
-      this.metrics.setMemoryPerInstance(memoryPerInstance * 1024L * 1024L);
-      this.metrics.setCpuCoresPerInstance(coresPerExecutor);
     } else {
       this.metrics = null;
       this.pauseMonitor = null;
     }
 
-    LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance
-        + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance="
-        + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor
-        + ", nodeBlacklistConf=" + nodeBlacklistConf
-        + ", localityDelayMs=" + localityDelayMs);
+    String hostsString = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+    LOG.info(
+        "Running with configuration: hosts={}, numSchedulableTasksPerNode={}, nodeBlacklistConf={}, localityConf={}",
+        hostsString, numSchedulableTasksPerNode, nodeBlacklistConf, localityDelayConf);
+
   }
 
   @Override
@@ -304,6 +292,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   public void start() throws IOException {
     writeLock.lock();
     try {
+      scheduledLoggingExecutor.schedule(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          readLock.lock();
+          try {
+            if (dagRunning) {
+              LOG.info("Stats for current dag: {}", dagStats);
+            }
+          } finally {
+            readLock.unlock();
+          }
+          return null;
+        }
+      }, 10000L, TimeUnit.MILLISECONDS);
+
       nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable);
       Futures.addCallback(nodeEnablerFuture, new LoggingFutureCallback("NodeEnablerThread", LOG));
 
@@ -344,12 +347,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       LOG.warn(
           "Not expecing Updates from the registry. Received update for instance={}. Ignoring",
           serviceInstance);
-//     Replacing NodeInfo means we end up discarding whatever state was known about that node.
-//     instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance,
-//          nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics));
-//
-//
-//    LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity());
     }
 
     @Override
@@ -408,6 +405,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     writeLock.lock();
     try {
       if (!this.isStopped.getAndSet(true)) {
+        scheduledLoggingExecutor.shutdownNow();
+
         nodeEnablerCallable.shutdown();
         if (nodeEnablerFuture != null) {
           nodeEnablerFuture.cancel(true);
@@ -526,7 +525,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     if (metrics != null) {
       metrics.incrCompletedDagCount();
     }
-    dagStats = new StatsPerDag();
+    writeLock.lock();
+    try {
+      dagRunning = false;
+      dagStats = new StatsPerDag();
+    } finally {
+      writeLock.unlock();
+    }
     // TODO Cleanup pending tasks etc, so that the next dag is not affected.
   }
 
@@ -554,6 +559,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         priority, capability, Arrays.toString(hosts));
     writeLock.lock();
     try {
+      dagRunning = true;
       dagStats.registerTaskRequest(hosts, racks);
     } finally {
       writeLock.unlock();
@@ -573,6 +579,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         priority, capability, containerId);
     writeLock.lock();
     try {
+      dagRunning = true;
       dagStats.registerTaskRequest(null, null);
     } finally {
       writeLock.unlock();
@@ -621,7 +628,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           taskInfo.getState(), endReason);
       // Re-enable the node if preempted
       if (taskInfo.getState() == TaskInfo.State.PREEMPTED) {
-        LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason);
         unregisterPendingPreemption(taskInfo.assignedNode.getHost());
         nodeInfo.registerUnsuccessfulTaskEnd(true);
         if (nodeInfo.isDisabled()) {
@@ -1051,7 +1057,10 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           }
           taskInfo.triedAssigningTask();
           ScheduleResult scheduleResult = scheduleTask(taskInfo, totalResource);
-          LOG.info("ScheduleResult for Task: {} = {}", taskInfo, scheduleResult);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("ScheduleResult for Task: {} = {}", taskInfo,
+                scheduleResult);
+          }
           if (scheduleResult == ScheduleResult.SCHEDULED) {
             taskIter.remove();
           } else {
@@ -1185,13 +1194,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) {
       NodeInfo nodeInfo = selectHostResult.nodeInfo;
       Container container =
-          containerFactory.createContainer(resourcePerExecutor, taskInfo.priority,
+          containerFactory.createContainer(nodeInfo.getResourcePerExecutor(), taskInfo.priority,
               nodeInfo.getHost(),
               nodeInfo.getRpcPort(),
               nodeInfo.getServiceAddress());
       writeLock.lock(); // While updating local structures
       try {
-        LOG.info("Assigned task={} on node={}, to container={} on node={}",
+        LOG.info("Assigned task={} on node={}, to container={}",
             taskInfo, nodeInfo.toShortString(), container.getId());
         dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks,
             nodeInfo.getHost());
@@ -1498,6 +1507,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     private int numScheduledTasks = 0;
     private final int numSchedulableTasks;
     private final LlapTaskSchedulerMetrics metrics;
+    private final Resource resourcePerExecutor;
 
     /**
      * Create a NodeInfo bound to a service instance
@@ -1516,6 +1526,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       this.clock = clock;
       this.metrics = metrics;
 
+      int numVcores = serviceInstance.getResource().getVirtualCores();
+      int memoryPerInstance = serviceInstance.getResource().getMemory();
+      int memoryPerExecutor = (int)(memoryPerInstance / (double) numVcores);
+      resourcePerExecutor = Resource.newInstance(memoryPerExecutor, 1);
+
       if (numSchedulableTasksConf == 0) {
         int pendingQueueuCapacity = 0;
         String pendingQueueCapacityString = serviceInstance.getProperties()
@@ -1526,7 +1541,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         if (pendingQueueCapacityString != null) {
           pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString);
         }
-        this.numSchedulableTasks = serviceInstance.getResource().getVirtualCores() + pendingQueueuCapacity;
+        this.numSchedulableTasks = numVcores + pendingQueueuCapacity;
       } else {
         this.numSchedulableTasks = numSchedulableTasksConf;
         LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks);
@@ -1553,6 +1568,10 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       return serviceInstance.getServicesAddress();
     }
 
+    public Resource getResourcePerExecutor() {
+      return resourcePerExecutor;
+    }
+
     void enableNode() {
       expireTimeMillis = -1;
       disabled = false;
@@ -1634,21 +1653,36 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       return hadCommFailure;
     }
 
+    int canAcceptCounter = 0;
     /* Returning true does not guarantee that the task will run, considering other queries
     may be running in the system. Also depends upon the capacity usage configuration
      */
     boolean canAcceptTask() {
       boolean result = !hadCommFailure && !disabled
           &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " +
-                serviceInstance.getWorkerIdentity() + "]: " +
-                "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}",
-            result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(constructCanAcceptLogResult(result));
+      }
+      if (canAcceptCounter == 10000) {
+        canAcceptCounter++;
+        LOG.info(constructCanAcceptLogResult(result));
+        canAcceptCounter = 0;
       }
       return result;
     }
 
+    String constructCanAcceptLogResult(boolean result) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Node[").append(serviceInstance.getHost()).append(":").append(serviceInstance.getRpcPort())
+          .append(", ").append(serviceInstance.getWorkerIdentity()).append("]: ")
+          .append("canAcceptTask=").append(result)
+          .append(", numScheduledTasks=").append(numScheduledTasks)
+          .append(", numSchedulableTasks=").append(numSchedulableTasks)
+          .append(", hadCommFailure=").append(hadCommFailure)
+          .append(", disabled=").append(disabled);
+      return sb.toString();
+    }
+
     @Override
     public long getDelay(TimeUnit unit) {
       return unit.convert(expireTimeMillis - clock.getTime(), TimeUnit.MILLISECONDS);
@@ -1911,7 +1945,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           ", priority=" + priority +
           ", startTime=" + startTime +
           ", containerId=" + containerId +
-          ", assignedNode=" + (assignedNode == null ? "" : assignedNode.toShortString()) +
+          (assignedNode != null ? "assignedNode=" + assignedNode.toShortString() : "") +
           ", uniqueId=" + uniqueId +
           ", localityDelayTimeout=" + localityDelayTimeout +
           '}';