You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/04/28 21:11:28 UTC

hive git commit: HIVE-13536: LLAP: Add metrics for task scheduler

Repository: hive
Updated Branches:
  refs/heads/master 8bdf618f5 -> 0ebcd938c


HIVE-13536: LLAP: Add metrics for task scheduler


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0ebcd938
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0ebcd938
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0ebcd938

Branch: refs/heads/master
Commit: 0ebcd938cf0da7ef74ef3534a3aef4b7976a1163
Parents: 8bdf618
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Thu Apr 28 14:10:13 2016 -0500
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Thu Apr 28 14:10:13 2016 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   8 +
 .../hadoop/hive/llap/cache/BuddyAllocator.java  |   5 +-
 .../llap/daemon/impl/ContainerRunnerImpl.java   |   7 +-
 .../hive/llap/daemon/impl/LlapDaemon.java       |  20 +-
 .../llap/daemon/impl/TaskExecutorService.java   |  53 ++++-
 .../llap/daemon/impl/TaskRunnerCallable.java    |  28 ++-
 .../hive/llap/io/api/impl/LlapIoImpl.java       |   1 -
 .../llap/io/decode/EncodedDataConsumer.java     |   5 -
 .../llap/io/encoded/OrcEncodedDataReader.java   |   3 -
 .../hive/llap/metrics/LlapDaemonCacheInfo.java  |   6 +-
 .../llap/metrics/LlapDaemonCacheMetrics.java    |  35 +---
 .../llap/metrics/LlapDaemonExecutorInfo.java    |  23 ++-
 .../llap/metrics/LlapDaemonExecutorMetrics.java | 152 +++++++++++---
 .../hive/llap/metrics/LlapDaemonIOInfo.java     |   4 -
 .../hive/llap/metrics/LlapDaemonIOMetrics.java  |  37 +---
 .../daemon/impl/TestTaskExecutorService.java    |   2 +-
 .../tezplugins/LlapTaskSchedulerService.java    | 122 ++++++++++--
 .../metrics/LlapTaskSchedulerInfo.java          |  59 ++++++
 .../metrics/LlapTaskSchedulerMetrics.java       | 197 +++++++++++++++++++
 .../TestLlapTaskSchedulerService.java           |   2 +-
 20 files changed, 614 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index eeb9b84..566e9b6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -330,6 +330,7 @@ public class HiveConf extends Configuration {
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION.varname);
+    llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WEB_PORT.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WEB_SSL.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
@@ -2763,6 +2764,13 @@ public class HiveConf extends Configuration {
             " to a location other than the ones requested. Set to -1 for an infinite delay, 0" +
             "for a no delay. Currently these are the only two supported values"
     ),
+    LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS(
+        "hive.llap.daemon.task.preemption.metrics.intervals", "30,60,300",
+        "Comma-delimited set of integers denoting the desired rollover intervals (in seconds)\n" +
+        " for percentile latency metrics. Used by LLAP daemon task scheduler metrics for\n" +
+        " time taken to kill task (due to pre-emption) and useful time wasted by the task that\n" +
+        " is about to be preempted."
+    ),
     LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE("hive.llap.daemon.task.scheduler.wait.queue.size",
       10, "LLAP scheduler maximum queue size.", "llap.daemon.task.scheduler.wait.queue.size"),
     LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME(

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 9f7e5c9..d78c1e0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -40,7 +40,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
   private final long maxSize;
   private final boolean isDirect;
   private final LlapDaemonCacheMetrics metrics;
-  
+
   // We don't know the acceptable size for Java array, so we'll use 1Gb boundary.
   // That is guaranteed to fit any maximum allocation.
   private static final int MAX_ARENA_SIZE = 1024*1024*1024;
@@ -113,9 +113,6 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
 
     this.metrics = metrics;
     metrics.incrAllocatedArena();
-    metrics.setArenaSize(arenaSize);
-    metrics.setMinAllocationSize(minAllocation);
-    metrics.setMaxAllocationSize(maxAllocation);
   }
 
   // TODO: would it make sense to return buffers asynchronously?

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index e80fb15..3d45c7a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -106,7 +106,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     String waitQueueSchedulerClassName = HiveConf.getVar(
         conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME);
     this.executorService = new TaskExecutorService(numExecutors, waitQueueSize,
-        waitQueueSchedulerClassName, enablePreemption, classLoader);
+        waitQueueSchedulerClassName, enablePreemption, classLoader, metrics);
 
     addIfService(executorService);
 
@@ -218,8 +218,9 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
             .setSubmissionState(SubmissionStateProto.valueOf(submissionState.name()))
             .build();
       }
-      metrics.incrExecutorTotalRequestsHandled();
-      metrics.incrExecutorNumQueuedRequests();
+      if (metrics != null) {
+        metrics.incrExecutorTotalRequestsHandled();
+      }
     } finally {
       NDC.pop();
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 33b41e8..63cb16b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -20,7 +20,9 @@ import java.lang.management.MemoryPoolMXBean;
 import java.lang.management.MemoryType;
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -68,6 +70,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
 
 public class LlapDaemon extends CompositeService implements ContainerRunner, LlapDaemonMXBean {
 
@@ -193,12 +196,25 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     String displayName = "LlapDaemonExecutorMetrics-" + MetricsUtils.getHostName();
     String sessionId = MetricsUtils.getUUID();
     daemonConf.set("llap.daemon.metrics.sessionid", sessionId);
-    this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors);
+    String[] strIntervals = HiveConf.getTrimmedStringsVar(daemonConf,
+        HiveConf.ConfVars.LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS);
+    List<Integer> intervalList = new ArrayList<>();
+    if (strIntervals != null) {
+      for (String strInterval : strIntervals) {
+        try {
+          intervalList.add(Integer.valueOf(strInterval));
+        } catch (NumberFormatException e) {
+          LOG.warn("Ignoring task pre-emption metrics interval {} from {} as it is invalid",
+              strInterval, Arrays.toString(strIntervals));
+        }
+      }
+    }
+    this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors,
+        Ints.toArray(intervalList));
     this.metrics.setMemoryPerInstance(executorMemoryBytes);
     this.metrics.setCacheMemoryPerInstance(ioMemoryBytes);
     this.metrics.setJvmMaxMemory(maxJvmMemory);
     this.metrics.setWaitQueueSize(waitQueueSize);
-    this.metrics.setRpcNumHandlers(numHandlers);
     metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
     this.llapDaemonInfoBean = MBeans.register("LlapDaemon", "LlapDaemonInfo", this);
     LOG.info("Started LlapMetricsSystem with displayName: " + displayName +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 57dd828..f621af2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -44,6 +44,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tez.runtime.task.EndReason;
 import org.apache.tez.runtime.task.TaskRunner2Result;
@@ -104,10 +105,11 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
   final ConcurrentMap<String, TaskWrapper> knownTasks = new ConcurrentHashMap<>();
 
   private final Object lock = new Object();
+  private final LlapDaemonExecutorMetrics metrics;
 
   public TaskExecutorService(int numExecutors, int waitQueueSize,
       String waitQueueComparatorClassName, boolean enablePreemption,
-      ClassLoader classLoader) {
+      ClassLoader classLoader, final LlapDaemonExecutorMetrics metrics) {
     super(TaskExecutorService.class.getSimpleName());
     LOG.info("TaskExecutorService is being setup with parameters: "
         + "numExecutors=" + numExecutors
@@ -127,6 +129,10 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
         new PreemptionQueueComparator());
     this.enablePreemption = enablePreemption;
     this.numSlotsAvailable = new AtomicInteger(numExecutors);
+    this.metrics = metrics;
+    if (metrics != null) {
+      metrics.setNumExecutorsAvailable(numSlotsAvailable.get());
+    }
 
     // single threaded scheduler for tasks from wait queue to executor threads
     ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
@@ -267,7 +273,11 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
               trySchedule(task);
               // wait queue could have been re-ordered in the mean time because of concurrent task
               // submission. So remove the specific task instead of the head task.
-              waitQueue.remove(task);
+              if (waitQueue.remove(task)) {
+                if (metrics != null) {
+                  metrics.setExecutorNumQueuedRequests(waitQueue.size());
+                }
+              }
             } catch (RejectedExecutionException e) {
               rejectedException = e;
             }
@@ -361,6 +371,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
         if (isDebugEnabled) {
           LOG.debug("{} is {} as wait queue is full", taskWrapper.getRequestId(), result);
         }
+        if (metrics != null) {
+          metrics.incrTotalRejectedRequests();
+        }
         return result;
       }
     }
@@ -392,11 +405,17 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
         LOG.info("{} evicted from wait queue in favor of {} because of lower priority",
             evictedTask.getRequestId(), task.getRequestId());
       }
+      if (metrics != null) {
+        metrics.incrTotalEvictedFromWaitQueue();
+      }
     }
     synchronized (lock) {
       lock.notify();
     }
 
+    if (metrics != null) {
+      metrics.setExecutorNumQueuedRequests(waitQueue.size());
+    }
     return result;
   }
 
@@ -411,7 +430,11 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
             LOG.debug("Removing {} from waitQueue", fragmentId);
           }
           taskWrapper.setIsInWaitQueue(false);
-          waitQueue.remove(taskWrapper);
+          if (waitQueue.remove(taskWrapper)) {
+            if (metrics != null) {
+              metrics.setExecutorNumQueuedRequests(waitQueue.size());
+            }
+          }
         }
         if (taskWrapper.isInPreemptionQueue()) {
           if (isDebugEnabled) {
@@ -419,6 +442,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
           }
           taskWrapper.setIsInPreemptableQueue(false);
           preemptionQueue.remove(taskWrapper);
+          if (metrics != null) {
+            metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
+          }
         }
         taskWrapper.getTaskRunnerCallable().killTask();
       } else {
@@ -460,6 +486,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
         }
       }
       numSlotsAvailable.decrementAndGet();
+      if (metrics != null) {
+        metrics.setNumExecutorsAvailable(numSlotsAvailable.get());
+      }
   }
 
   private void handleScheduleAttemptedRejection(TaskWrapper taskWrapper) {
@@ -511,11 +540,17 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
         LOG.debug("Removing {} from preemption queue because it's state changed to {}",
             taskWrapper.getRequestId(), newFinishableState);
         preemptionQueue.remove(taskWrapper.getTaskRunnerCallable());
+        if (metrics != null) {
+          metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
+        }
       } else if (newFinishableState == false && !taskWrapper.isInPreemptionQueue() &&
           !taskWrapper.isInWaitQueue()) {
         LOG.debug("Adding {} to preemption queue since finishable state changed to {}",
             taskWrapper.getRequestId(), newFinishableState);
         preemptionQueue.offer(taskWrapper);
+        if (metrics != null) {
+          metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
+        }
       }
       lock.notify();
     }
@@ -525,6 +560,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
     synchronized (lock) {
       preemptionQueue.add(taskWrapper);
       taskWrapper.setIsInPreemptableQueue(true);
+      if (metrics != null) {
+        metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
+      }
     }
   }
 
@@ -534,6 +572,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
        taskWrapper = preemptionQueue.remove();
       if (taskWrapper != null) {
         taskWrapper.setIsInPreemptableQueue(false);
+        if (metrics != null) {
+          metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
+        }
       }
     }
     return taskWrapper;
@@ -582,9 +623,15 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
               .getTaskIdentifierString(taskWrapper.getTaskRunnerCallable().getRequest())
               + " request " + state + "! Removed from preemption list.");
         }
+        if (metrics != null) {
+          metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
+        }
       }
 
       numSlotsAvailable.incrementAndGet();
+      if (metrics != null) {
+        metrics.setNumExecutorsAvailable(numSlotsAvailable.get());
+      }
       if (isDebugEnabled) {
         LOG.debug("Task {} complete. WaitQueueSize={}, numSlotsAvailable={}, preemptionQueueSize={}",
           taskWrapper.getRequestId(), waitQueue.size(), numSlotsAvailable.get(),

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 2a60123..fcfa940 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -413,10 +413,15 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
         // Only the KILLED case requires a message to be sent out to the AM.
         case SUCCESS:
           LOG.debug("Successfully finished {}", requestId);
-          metrics.incrExecutorTotalSuccess();
+          if (metrics != null) {
+            metrics.incrExecutorTotalSuccess();
+          }
           break;
         case CONTAINER_STOP_REQUESTED:
           LOG.info("Received container stop request (AM preemption) for {}", requestId);
+          if (metrics != null) {
+            metrics.incrExecutorTotalKilled();
+          }
           break;
         case KILL_REQUESTED:
           LOG.info("Killed task {}", requestId);
@@ -424,17 +429,26 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
             killtimerWatch.stop();
             long elapsed = killtimerWatch.elapsedMillis();
             LOG.info("Time to die for task {}", elapsed);
+            if (metrics != null) {
+              metrics.addMetricsPreemptionTimeToKill(elapsed);
+            }
+          }
+          if (metrics != null) {
+            metrics.addMetricsPreemptionTimeLost(runtimeWatch.elapsedMillis());
+            metrics.incrExecutorTotalKilled();
           }
-          metrics.incrPreemptionTimeLost(runtimeWatch.elapsedMillis());
-          metrics.incrExecutorTotalKilled();
           break;
         case COMMUNICATION_FAILURE:
           LOG.info("Failed to run {} due to communication failure", requestId);
-          metrics.incrExecutorTotalExecutionFailed();
+          if (metrics != null) {
+            metrics.incrExecutorTotalExecutionFailed();
+          }
           break;
         case TASK_ERROR:
           LOG.info("Failed to run {} due to task error", requestId);
-          metrics.incrExecutorTotalExecutionFailed();
+          if (metrics != null) {
+            metrics.incrExecutorTotalExecutionFailed();
+          }
           break;
       }
       fragmentCompletionHanler.fragmentComplete(fragmentInfo);
@@ -448,7 +462,6 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
               request.getFragmentSpec().getFragmentNumber(),
               request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName,
               taskRunnerCallable.startTime, true);
-      metrics.decrExecutorNumQueuedRequests();
     }
 
     @Override
@@ -466,9 +479,6 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
               request.getFragmentSpec().getFragmentNumber(),
               request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName,
               taskRunnerCallable.startTime, false);
-      if (metrics != null) {
-        metrics.decrExecutorNumQueuedRequests();
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 6a72b4c..fea3dc7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -139,7 +139,6 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
     int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
     executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads,
         new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build()));
-    ioMetrics.setIoThreadPoolSize(numThreads);
     // TODO: this should depends on input format and be in a map, or something.
     this.cvp = new OrcColumnVectorProducer(
         metadataCache, orcCache, bufferManager, conf, cacheMetrics, ioMetrics);

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index 0ba7c09..6b54b30 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -53,7 +53,6 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
             // Don't reset anything, we are reusing column vectors.
           }
         });
-    this.ioMetrics.setColumnVectorBatchPoolSize(cvbPool.size());
   }
 
   public void init(ConsumerFeedback<BatchType> upstreamFeedback,
@@ -62,10 +61,6 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
     this.readCallable = readCallable;
   }
 
-  public LlapDaemonIOMetrics getIOMetrics() {
-    return ioMetrics;
-  }
-
   @Override
   public Callable<Void> getReadCallable() {
     return readCallable;

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 83011fb..7effe69 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -182,9 +182,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    LlapDaemonIOMetrics ioMetrics = consumer.getIOMetrics();
-    ioMetrics.setColumnStreamDataPoolSize(CSD_POOL.size());
-    ioMetrics.setEncodedColumnBatchPoolSize(ECB_POOL.size());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java
index 191345e..427a0b1 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java
@@ -26,6 +26,7 @@ import com.google.common.base.Objects;
  */
 public enum LlapDaemonCacheInfo implements MetricsInfo {
   CacheMetrics("Llap daemon cache related metrics"),
+  CacheCapacityRemainingPercentage("Percentage of memory available in cache"),
   CacheCapacityRemaining("Amount of memory available in cache in bytes"),
   CacheCapacityTotal("Total amount of memory allocated for cache in bytes"),
   CacheCapacityUsed("Amount of memory used in cache in bytes"),
@@ -34,10 +35,7 @@ public enum LlapDaemonCacheInfo implements MetricsInfo {
   CacheHitRatio("Ratio of disk ranges cached vs requested"),
   CacheReadRequests("Number of disk range requests to cache"),
   CacheAllocatedArena("Number of arenas allocated"),
-  CacheNumLockedBuffers("Number of locked buffers in cache"),
-  CacheArenaSize("Size of arena used by allocator"),
-  CacheMinAllocationSize("Minimum allocation size used by allocator"),
-  CacheMaxAllocationSize("Maximum allocation size used by allocator");
+  CacheNumLockedBuffers("Number of locked buffers in cache");
 
   private final String desc;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java
index bb76da5..5f30b2d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java
@@ -18,18 +18,16 @@
 package org.apache.hadoop.hive.llap.metrics;
 
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheAllocatedArena;
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheArenaSize;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheCapacityRemaining;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheCapacityRemainingPercentage;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheCapacityTotal;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheCapacityUsed;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheHitBytes;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheHitRatio;
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheMaxAllocationSize;
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheMinAllocationSize;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheMetrics;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheNumLockedBuffers;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheReadRequests;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheRequestedBytes;
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheMetrics;
 import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
 import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
 
@@ -68,12 +66,6 @@ public class LlapDaemonCacheMetrics implements MetricsSource {
   MutableCounterLong cacheAllocatedArena;
   @Metric
   MutableCounterLong cacheNumLockedBuffers;
-  @Metric
-  MutableGaugeLong arenaSize;
-  @Metric
-  MutableGaugeLong minAllocationSize;
-  @Metric
-  MutableGaugeLong maxAllocationSize;
 
   private LlapDaemonCacheMetrics(String name, String sessionId) {
     this.name = name;
@@ -115,18 +107,6 @@ public class LlapDaemonCacheMetrics implements MetricsSource {
     cacheNumLockedBuffers.incr();
   }
 
-  public void setArenaSize(long value) {
-    arenaSize.set(value);
-  }
-
-  public void setMinAllocationSize(long value) {
-    minAllocationSize.set(value);
-  }
-
-  public void setMaxAllocationSize(long value) {
-    maxAllocationSize.set(value);
-  }
-
   public void decrCacheNumLockedBuffers() {
     cacheNumLockedBuffers.incr(-1);
   }
@@ -158,7 +138,11 @@ public class LlapDaemonCacheMetrics implements MetricsSource {
     float cacheHitRatio = cacheRequestedBytes.value() == 0 ? 0.0f :
         (float) cacheHitBytes.value() / (float) cacheRequestedBytes.value();
 
-    rb.addCounter(CacheCapacityRemaining, cacheCapacityTotal.value() - cacheCapacityUsed.value())
+    long cacheCapacityRemaining = cacheCapacityTotal.value() - cacheCapacityUsed.value();
+    float cacheRemainingPercent = cacheCapacityTotal.value() == 0 ? 0.0f :
+        (float) cacheCapacityRemaining / (float) cacheCapacityTotal.value();
+    rb.addCounter(CacheCapacityRemaining, cacheCapacityRemaining)
+        .addGauge(CacheCapacityRemainingPercentage, cacheRemainingPercent)
         .addCounter(CacheCapacityTotal, cacheCapacityTotal.value())
         .addCounter(CacheCapacityUsed, cacheCapacityUsed.value())
         .addCounter(CacheReadRequests, cacheReadRequests.value())
@@ -166,10 +150,7 @@ public class LlapDaemonCacheMetrics implements MetricsSource {
         .addCounter(CacheHitBytes, cacheHitBytes.value())
         .addCounter(CacheAllocatedArena, cacheAllocatedArena.value())
         .addCounter(CacheNumLockedBuffers, cacheNumLockedBuffers.value())
-        .addGauge(CacheHitRatio, cacheHitRatio)
-        .addGauge(CacheArenaSize, arenaSize.value())
-        .addGauge(CacheMinAllocationSize, minAllocationSize.value())
-        .addGauge(CacheMaxAllocationSize, maxAllocationSize.value());
+        .addGauge(CacheHitRatio, cacheHitRatio);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
index 941d926..db5fd4f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
@@ -26,21 +26,32 @@ import com.google.common.base.Objects;
  */
 public enum LlapDaemonExecutorInfo implements MetricsInfo {
   ExecutorMetrics("Llap daemon cache related metrics"),
-  ExecutorThreadCountPerInstance("Total number of executor threads per node"),
+  ExecutorMaxFreeSlots("Sum of wait queue size and number of executors"),
+  ExecutorNumExecutorsPerInstance("Total number of executor threads per node"),
+  ExecutorNumExecutorsAvailable("Total number of executor threads per node that are free"),
+  ExecutorAvailableFreeSlots("Number of free slots available"),
+  ExecutorAvailableFreeSlotsPercent("Percent of free slots available"),
+  ExecutorThreadCPUTime("Cpu time in nanoseconds"),
   ExecutorMemoryPerInstance("Total memory for executors per node in bytes"),
   ExecutorCacheMemoryPerInstance("Total Cache memory per node in bytes"),
   ExecutorJvmMaxMemory("Max memory available for JVM in bytes"),
   ExecutorWaitQueueSize("Size of wait queue per node"),
-  ExecutorRpcNumHandlers("Number of RPC handlers per node"),
-  ExecutorThreadCPUTime("Cpu time in nanoseconds"),
   ExecutorThreadUserTime("User time in nanoseconds"),
   ExecutorTotalRequestsHandled("Total number of requests handled by the container"),
   ExecutorNumQueuedRequests("Number of requests queued by the container for processing"),
+  ExecutorNumPreemptableRequests("Number of queued requests that are pre-emptable"),
+  ExecutorTotalRejectedRequests("Total number of requests rejected as wait queue being full"),
   ExecutorTotalSuccess("Total number of requests handled by the container that succeeded"),
-  ExecutorTotalExecutionFailure("Total number of requests handled by the container that failed execution"),
-  ExecutorTotalInterrupted("Total number of requests handled by the container that got interrupted"),
+  ExecutorTotalFailed("Total number of requests handled by the container that failed execution"),
+  ExecutorTotalKilled("Total number of requests handled by the container that got interrupted"),
   ExecutorTotalAskedToDie("Total number of requests handled by the container that were asked to die"),
-  PreemptionTimeLost("Total time lost due to task preemptions");
+  ExecutorTotalPreemptionTimeToKill("Total amount of time taken for killing tasks due to pre-emption"),
+  ExecutorTotalPreemptionTimeLost("Total useful cluster time lost because of pre-emption"),
+  ExecutorPercentileTimeToKill("Percentile time to kill for pre-empted tasks"),
+  ExecutorPercentileTimeLost("Percentile cluster time wasted due to pre-emption"),
+  ExecutorMaxPreemptionTimeToKill("Max time for killing pre-empted task"),
+  ExecutorMaxPreemptionTimeLost("Max cluster time lost due to pre-emption"),
+  ExecutorTotalEvictedFromWaitQueue("Total number of tasks evicted from wait queue because of low priority");
 
   private final String desc;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
index 894880f..1110683 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
@@ -17,22 +17,30 @@
  */
 package org.apache.hadoop.hive.llap.metrics;
 
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlots;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlotsPercent;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorCacheMemoryPerInstance;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorJvmMaxMemory;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxFreeSlots;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeLost;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeToKill;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMemoryPerInstance;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailable;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumPreemptableRequests;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumQueuedRequests;
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorRpcNumHandlers;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadCPUTime;
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadCountPerInstance;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsPerInstance;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadUserTime;
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalAskedToDie;
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalExecutionFailure;
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalInterrupted;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalEvictedFromWaitQueue;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalFailed;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalKilled;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalRejectedRequests;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalSuccess;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMetrics;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalPreemptionTimeLost;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalPreemptionTimeToKill;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorWaitQueueSize;
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.PreemptionTimeLost;
 import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
 import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
 
@@ -54,6 +62,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 
 /**
@@ -70,13 +79,23 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
   private final ThreadMXBean threadMXBean;
   private final Map<Integer, MetricsInfo> cpuMetricsInfoMap;
   private final Map<Integer, MetricsInfo> userMetricsInfoMap;
+  private long maxTimeLost = Long.MIN_VALUE;
+  private long maxTimeToKill = Long.MIN_VALUE;
 
   final MutableGaugeLong[] executorThreadCpuTime;
   final MutableGaugeLong[] executorThreadUserTime;
   @Metric
   MutableCounterLong executorTotalRequestHandled;
   @Metric
-  MutableCounterLong executorNumQueuedRequests;
+  MutableGaugeInt executorNumQueuedRequests;
+  @Metric
+  MutableGaugeInt executorNumPreemptableRequests;
+  @Metric
+  MutableGaugeInt numExecutorsAvailable;
+  @Metric
+  MutableCounterLong totalRejectedRequests;
+  @Metric
+  MutableCounterLong totalEvictedFromWaitQueue;
   @Metric
   MutableCounterLong executorTotalSuccess;
   @Metric
@@ -84,8 +103,6 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
   @Metric
   MutableCounterLong executorTotalExecutionFailed;
   @Metric
-  MutableCounterLong preemptionTimeLost;
-  @Metric
   MutableGaugeLong cacheMemoryPerInstance;
   @Metric
   MutableGaugeLong memoryPerInstance;
@@ -94,10 +111,20 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
   @Metric
   MutableGaugeInt waitQueueSize;
   @Metric
-  MutableGaugeInt rpcNumHandlers;
+  MutableCounterLong totalPreemptionTimeToKill;
+  @Metric
+  MutableCounterLong totalPreemptionTimeLost;
+  @Metric
+  MutableGaugeLong maxPreemptionTimeToKill;
+  @Metric
+  MutableGaugeLong maxPreemptionTimeLost;
+  @Metric
+  final MutableQuantiles[] percentileTimeToKill;
+  @Metric
+  final MutableQuantiles[] percentileTimeLost;
 
   private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId,
-      int numExecutors) {
+      int numExecutors, final int[] intervals) {
     this.name = displayName;
     this.jvmMetrics = jm;
     this.sessionId = sessionId;
@@ -110,6 +137,21 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
     this.cpuMetricsInfoMap = new ConcurrentHashMap<>();
     this.userMetricsInfoMap = new ConcurrentHashMap<>();
 
+    final int len = intervals == null ? 0 : intervals.length;
+    this.percentileTimeToKill = new MutableQuantiles[len];
+    this.percentileTimeLost = new MutableQuantiles[len];
+    for (int i=0; i<len; i++) {
+      int interval = intervals[i];
+      percentileTimeToKill[i] = registry.newQuantiles(
+          LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeToKill.name() + "_" + interval + "s",
+          LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeToKill.description(),
+          "ops", "latency", interval);
+      percentileTimeLost[i] = registry.newQuantiles(
+          LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeLost.name() + "_" + interval + "s",
+          LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeLost.description(),
+          "ops", "latency", interval);
+    }
+
     for (int i = 0; i < numExecutors; i++) {
       MetricsInfo mic = new LlapDaemonCustomMetricsInfo(ExecutorThreadCPUTime.name() + "_" + i,
           ExecutorThreadCPUTime.description());
@@ -123,11 +165,11 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
   }
 
   public static LlapDaemonExecutorMetrics create(String displayName, String sessionId,
-      int numExecutors) {
+      int numExecutors, final int[] intervals) {
     MetricsSystem ms = LlapMetricsSystem.instance();
     JvmMetrics jm = JvmMetrics.create(MetricsUtils.METRICS_PROCESS_NAME, sessionId, ms);
     return ms.register(displayName, "LlapDaemon Executor Metrics",
-        new LlapDaemonExecutorMetrics(displayName, jm, sessionId, numExecutors));
+        new LlapDaemonExecutorMetrics(displayName, jm, sessionId, numExecutors, intervals));
   }
 
   @Override
@@ -143,12 +185,24 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
     executorTotalRequestHandled.incr();
   }
 
-  public void incrExecutorNumQueuedRequests() {
-    executorNumQueuedRequests.incr();
+  public void setExecutorNumQueuedRequests(int value) {
+    executorNumQueuedRequests.set(value);
+  }
+
+  public void setExecutorNumPreemptableRequests(int value) {
+    executorNumPreemptableRequests.set(value);
+  }
+
+  public void setNumExecutorsAvailable(int value) {
+    numExecutorsAvailable.set(value);
+  }
+
+  public void incrTotalEvictedFromWaitQueue() {
+    totalEvictedFromWaitQueue.incr();
   }
 
-  public void decrExecutorNumQueuedRequests() {
-    executorNumQueuedRequests.incr(-1);
+  public void incrTotalRejectedRequests() {
+    totalRejectedRequests.incr();
   }
 
   public void incrExecutorTotalSuccess() {
@@ -159,8 +213,30 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
     executorTotalExecutionFailed.incr();
   }
 
-  public void incrPreemptionTimeLost(long value) {
-    preemptionTimeLost.incr(value);
+  public void addMetricsPreemptionTimeLost(long value) {
+    totalPreemptionTimeLost.incr(value);
+
+    if (value > maxTimeLost) {
+      maxTimeLost = value;
+      maxPreemptionTimeLost.set(maxTimeLost);
+    }
+
+    for (MutableQuantiles q : percentileTimeLost) {
+      q.add(value);
+    }
+  }
+
+  public void addMetricsPreemptionTimeToKill(long value) {
+    totalPreemptionTimeToKill.incr(value);
+
+    if (value > maxTimeToKill) {
+      maxTimeToKill = value;
+      maxPreemptionTimeToKill.set(maxTimeToKill);
+    }
+
+    for (MutableQuantiles q : percentileTimeToKill) {
+      q.add(value);
+    }
   }
 
   public void incrExecutorTotalKilled() {
@@ -183,25 +259,43 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
     waitQueueSize.set(size);
   }
 
-  public void setRpcNumHandlers(int numHandlers) {
-    rpcNumHandlers.set(numHandlers);
-  }
-
   private void getExecutorStats(MetricsRecordBuilder rb) {
     updateThreadMetrics(rb);
+    final int totalSlots = waitQueueSize.value() + numExecutors;
+    final int slotsAvailableInQueue = waitQueueSize.value() - executorNumQueuedRequests.value();
+    final int slotsAvailableTotal = slotsAvailableInQueue + numExecutorsAvailable.value();
+    final float slotsAvailablePercent = totalSlots <= 0 ? 0.0f :
+        (float) slotsAvailableTotal / (float) totalSlots;
 
     rb.addCounter(ExecutorTotalRequestsHandled, executorTotalRequestHandled.value())
-        .addCounter(ExecutorNumQueuedRequests, executorNumQueuedRequests.value())
         .addCounter(ExecutorTotalSuccess, executorTotalSuccess.value())
-        .addCounter(ExecutorTotalExecutionFailure, executorTotalExecutionFailed.value())
-        .addCounter(ExecutorTotalInterrupted, executorTotalIKilled.value())
-        .addCounter(PreemptionTimeLost, preemptionTimeLost.value())
-        .addGauge(ExecutorThreadCountPerInstance, numExecutors)
+        .addCounter(ExecutorTotalFailed, executorTotalExecutionFailed.value())
+        .addCounter(ExecutorTotalKilled, executorTotalIKilled.value())
+        .addCounter(ExecutorTotalEvictedFromWaitQueue, totalEvictedFromWaitQueue.value())
+        .addCounter(ExecutorTotalRejectedRequests, totalRejectedRequests.value())
+        .addGauge(ExecutorNumQueuedRequests, executorNumQueuedRequests.value())
+        .addGauge(ExecutorNumPreemptableRequests, executorNumPreemptableRequests.value())
         .addGauge(ExecutorMemoryPerInstance, memoryPerInstance.value())
         .addGauge(ExecutorCacheMemoryPerInstance, cacheMemoryPerInstance.value())
         .addGauge(ExecutorJvmMaxMemory, jvmMaxMemory.value())
+        .addGauge(ExecutorMaxFreeSlots, totalSlots)
+        .addGauge(ExecutorNumExecutorsPerInstance, numExecutors)
         .addGauge(ExecutorWaitQueueSize, waitQueueSize.value())
-        .addGauge(ExecutorRpcNumHandlers, rpcNumHandlers.value());
+        .addGauge(ExecutorNumExecutorsAvailable, numExecutorsAvailable.value())
+        .addGauge(ExecutorAvailableFreeSlots, slotsAvailableTotal)
+        .addGauge(ExecutorAvailableFreeSlotsPercent, slotsAvailablePercent)
+        .addCounter(ExecutorTotalPreemptionTimeToKill, totalPreemptionTimeToKill.value())
+        .addCounter(ExecutorTotalPreemptionTimeLost, totalPreemptionTimeLost.value())
+        .addGauge(ExecutorMaxPreemptionTimeToKill, maxPreemptionTimeToKill.value())
+        .addGauge(ExecutorMaxPreemptionTimeLost, maxPreemptionTimeLost.value());
+
+    for (MutableQuantiles q : percentileTimeToKill) {
+      q.snapshot(rb, true);
+    }
+
+    for (MutableQuantiles q : percentileTimeLost) {
+      q.snapshot(rb, true);
+    }
   }
 
   private void updateThreadMetrics(MetricsRecordBuilder rb) {

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOInfo.java
index 79f004b..f0fde62 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOInfo.java
@@ -26,10 +26,6 @@ import com.google.common.base.Objects;
  */
 public enum LlapDaemonIOInfo implements MetricsInfo {
   IOMetrics("Llap daemon I/O elevator metrics"),
-  IoThreadPoolSize("Size of the thread pool used by IO elevator"),
-  EncodedColumnBatchPoolSize("Size of the object pool that stores encoded column batches"),
-  ColumnStreamDataPoolSize("Size of the object pool that stores column stream data"),
-  ColumnVectorBatchPoolSize("Size of the object pool that stores column vector batches"),
   PercentileDecodingTime("Percentile decoding time for encoded column batch"),
   MaxDecodingTime("Max time for decoding an encoded column batch");
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOMetrics.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOMetrics.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOMetrics.java
index f3def75..36eb0e5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOMetrics.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOMetrics.java
@@ -17,12 +17,8 @@
  */
 package org.apache.hadoop.hive.llap.metrics;
 
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonIOInfo.ColumnStreamDataPoolSize;
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonIOInfo.ColumnVectorBatchPoolSize;
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonIOInfo.EncodedColumnBatchPoolSize;
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonIOInfo.IoThreadPoolSize;
-import static org.apache.hadoop.hive.llap.metrics.LlapDaemonIOInfo.MaxDecodingTime;
 import static org.apache.hadoop.hive.llap.metrics.LlapDaemonIOInfo.IOMetrics;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonIOInfo.MaxDecodingTime;
 import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
 import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
 
@@ -33,7 +29,6 @@ import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
-import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.metrics2.lib.MutableRate;
@@ -52,14 +47,6 @@ public class LlapDaemonIOMetrics implements MetricsSource {
   private long maxTime = Long.MIN_VALUE;
 
   @Metric
-  MutableGaugeInt encodedColumnBatchPoolSize;
-  @Metric
-  MutableGaugeInt columnStreamDataPoolSize;
-  @Metric
-  MutableGaugeInt columnVectorBatchPool;
-  @Metric
-  MutableGaugeInt ioThreadPoolSize;
-  @Metric
   MutableRate rateOfDecoding;
   final MutableQuantiles[] decodingTimes;
   @Metric
@@ -101,22 +88,6 @@ public class LlapDaemonIOMetrics implements MetricsSource {
     return name;
   }
 
-  public void setEncodedColumnBatchPoolSize(int size) {
-    encodedColumnBatchPoolSize.set(size);
-  }
-
-  public void setColumnStreamDataPoolSize(int size) {
-    columnStreamDataPoolSize.set(size);
-  }
-
-  public void setColumnVectorBatchPoolSize(int size) {
-    columnVectorBatchPool.set(size);
-  }
-
-  public void setIoThreadPoolSize(int size) {
-    ioThreadPoolSize.set(size);
-  }
-
   public void addDecodeBatchTime(long latency) {
     rateOfDecoding.add(latency);
     if (latency > maxTime) {
@@ -129,11 +100,7 @@ public class LlapDaemonIOMetrics implements MetricsSource {
   }
 
   private void getIoStats(MetricsRecordBuilder rb) {
-    rb.addGauge(EncodedColumnBatchPoolSize, encodedColumnBatchPoolSize.value())
-        .addGauge(ColumnStreamDataPoolSize, columnStreamDataPoolSize.value())
-        .addGauge(ColumnVectorBatchPoolSize, columnVectorBatchPool.value())
-        .addGauge(IoThreadPoolSize, ioThreadPoolSize.value())
-        .addGauge(MaxDecodingTime, maxDecodingTime.value());
+    rb.addGauge(MaxDecodingTime, maxDecodingTime.value());
     rateOfDecoding.snapshot(rb, true);
 
     for (MutableQuantiles q : decodingTimes) {

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index d1edd12..506f611 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -190,7 +190,7 @@ public class TestTaskExecutorService {
     public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName,
                                       boolean enablePreemption) {
       super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption,
-          Thread.currentThread().getContextClassLoader());
+          Thread.currentThread().getContextClassLoader(), null);
     }
 
     private ConcurrentMap<String, InternalCompletionListenerForTest> completionListeners = new ConcurrentHashMap<>();

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 5ecbf79..c3d3a1d 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -56,10 +56,14 @@ import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
+import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
 import org.apache.hadoop.hive.llap.registry.ServiceInstance;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerMetrics;
+import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -173,12 +177,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   @VisibleForTesting
   StatsPerDag dagStats = new StatsPerDag();
 
+  private final LlapTaskSchedulerMetrics metrics;
+  private final JvmPauseMonitor pauseMonitor;
+
   public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
-    this(taskSchedulerContext, new SystemClock());
+    this(taskSchedulerContext, new SystemClock(), true);
   }
 
   @VisibleForTesting
-  public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock) {
+  public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock,
+      boolean initMetrics) {
     super(taskSchedulerContext);
     this.clock = clock;
     try {
@@ -236,6 +244,24 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build());
     schedulerExecutor = MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw);
 
+    if (initMetrics) {
+      // Initialize the metrics system
+      LlapMetricsSystem.initialize("LlapDaemon");
+      this.pauseMonitor = new JvmPauseMonitor(conf);
+      pauseMonitor.start();
+      String displayName = "LlapTaskSchedulerMetrics-" + MetricsUtils.getHostName();
+      String sessionId = conf.get("llap.daemon.metrics.sessionid");
+      // TODO: Not sure about the use of this. Should we instead use workerIdentity as sessionId?
+      this.metrics = LlapTaskSchedulerMetrics.create(displayName, sessionId);
+      this.metrics.setNumExecutors(executorsPerInstance);
+      this.metrics.setMemoryPerInstance(memoryPerInstance * 1024L * 1024L);
+      this.metrics.setCpuCoresPerInstance(coresPerExecutor);
+      this.metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
+    } else {
+      this.metrics = null;
+      this.pauseMonitor = null;
+    }
+
     LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance
         + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance="
         + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor
@@ -280,7 +306,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       registry.registerStateChangeListener(new NodeStateChangeListener());
       activeInstances = registry.getInstances();
       for (ServiceInstance inst : activeInstances.getAll().values()) {
-        addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode));
+        addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode,
+            metrics));
       }
     } finally {
       writeLock.unlock();
@@ -293,14 +320,14 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     @Override
     public void onCreate(final ServiceInstance serviceInstance) {
       addNode(serviceInstance, new NodeInfo(serviceInstance, nodeBlacklistConf, clock,
-          numSchedulableTasksPerNode));
+          numSchedulableTasksPerNode, metrics));
       LOG.info("Added node with identity: {}", serviceInstance.getWorkerIdentity());
     }
 
     @Override
     public void onUpdate(final ServiceInstance serviceInstance) {
       instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance,
-          nodeBlacklistConf, clock, numSchedulableTasksPerNode));
+          nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics));
       LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity());
     }
 
@@ -309,6 +336,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       // FIXME: disabling this for now
       // instanceToNodeMap.remove(serviceInstance.getWorkerIdentity());
       LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity());
+      if (metrics != null) {
+        metrics.setClusterNodeCount(activeInstances.size());
+      }
       // if there are no more nodes. Signal timeout monitor to start timer
       if (activeInstances.size() == 0) {
         LOG.info("No node found. Signalling scheduler timeout monitor thread to start timer.");
@@ -378,6 +408,15 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         if (registry != null) {
           registry.stop();
         }
+
+        if (pauseMonitor != null) {
+          pauseMonitor.stop();
+        }
+
+        if (metrics != null) {
+          LlapMetricsSystem.shutdown();
+        }
+
       }
     } finally {
       writeLock.unlock();
@@ -454,6 +493,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     // This is effectively DAG completed, and can be used to reset statistics being tracked.
     LOG.info("DAG: " + dagCounter.get() + " completed. Scheduling stats: " + dagStats);
     dagCounter.incrementAndGet();
+    if (metrics != null) {
+      metrics.incrCompletedDagCount();
+    }
     dagStats = new StatsPerDag();
   }
 
@@ -544,9 +586,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           // Also reset commFailures since a task was able to communicate back and indicate success.
           nodeInfo.enableNode();
           // Re-insert into the queue to force the poll thread to remove the element.
-          if ( disabledNodesQueue.remove(nodeInfo)) {
-            disabledNodesQueue.add(nodeInfo);
-          }
+          reinsertNodeInfo(nodeInfo);
         }
         // In case of success, trigger a scheduling run for pending tasks.
         trySchedulingPendingTasks();
@@ -562,9 +602,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
             // Also reset commFailures since a task was able to communicate back and indicate success.
             nodeInfo.enableNode();
             // Re-insert into the queue to force the poll thread to remove the element.
-            if (disabledNodesQueue.remove(nodeInfo)) {
-              disabledNodesQueue.add(nodeInfo);
-            }
+            reinsertNodeInfo(nodeInfo);
           }
           // In case of success, trigger a scheduling run for pending tasks.
           trySchedulingPendingTasks();
@@ -599,6 +637,15 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     return true;
   }
 
+  private void reinsertNodeInfo(final NodeInfo nodeInfo) {
+    if ( disabledNodesQueue.remove(nodeInfo)) {
+      disabledNodesQueue.add(nodeInfo);
+    }
+    if (metrics != null) {
+      metrics.setDisabledNodeCount(disabledNodesQueue.size());
+    }
+  }
+
   @Override
   public Object deallocateContainer(ContainerId containerId) {
     LOG.debug("Ignoring deallocateContainer for containerId: " + containerId);
@@ -704,7 +751,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         if (inst.isAlive() && instanceToNodeMap.containsKey(inst.getWorkerIdentity()) == false) {
           /* that's a good node, not added to the allocations yet */
           LOG.info("Found a new node: " + inst + ".");
-          addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode));
+          addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode,
+              metrics));
         }
       }
     } finally {
@@ -720,6 +768,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       stopTimeoutMonitor();
     }
     instanceToNodeMap.put(inst.getWorkerIdentity(), node);
+    if (metrics != null) {
+      metrics.setClusterNodeCount(activeInstances.size());
+    }
     // Trigger scheduling since a new node became available.
     trySchedulingPendingTasks();
   }
@@ -752,6 +803,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         nodeInfo.disableNode(isCommFailure);
         // TODO: handle task to container map events in case of hard failures
         disabledNodesQueue.add(nodeInfo);
+        if (metrics != null) {
+          metrics.setDisabledNodeCount(disabledNodesQueue.size());
+        }
       }
     } finally {
       writeLock.unlock();
@@ -768,6 +822,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       }
       tasksAtPriority.add(taskInfo);
       knownTasks.putIfAbsent(taskInfo.task, taskInfo);
+      if (metrics != null) {
+        metrics.incrPendingTasksCount();
+      }
     } finally {
       writeLock.unlock();
     }
@@ -799,6 +856,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         runningTasks.put(priority, tasksAtpriority);
       }
       tasksAtpriority.add(taskInfo);
+      if (metrics != null) {
+        metrics.decrPendingTasksCount();
+      }
     } finally {
       writeLock.unlock();
     }
@@ -1034,6 +1094,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     writeLock.lock();
     try {
       pendingPreemptions.incrementAndGet();
+      if (metrics != null) {
+        metrics.incrPendingPreemptionTasksCount();
+      }
       MutableInt val = pendingPreemptionsPerHost.get(host);
       if (val == null) {
         val = new MutableInt(1);
@@ -1049,6 +1112,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     writeLock.lock();
     try {
       pendingPreemptions.decrementAndGet();
+      if (metrics != null) {
+        metrics.decrPendingPreemptionTasksCount();
+      }
       MutableInt val = pendingPreemptionsPerHost.get(host);
       Preconditions.checkNotNull(val);
       val.decrement();
@@ -1199,23 +1265,24 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     private int numPreemptedTasks = 0;
     private int numScheduledTasks = 0;
     private final int numSchedulableTasks;
-
+    private final LlapTaskSchedulerMetrics metrics;
 
     /**
      * Create a NodeInfo bound to a service instance
-     *
-     * @param serviceInstance         the associated serviceInstance
+     *  @param serviceInstance         the associated serviceInstance
      * @param blacklistConf           blacklist configuration
      * @param clock                   clock to use to obtain timing information
      * @param numSchedulableTasksConf number of schedulable tasks on the node. 0 represents auto
-     *                                detect based on the serviceInstance, -1 indicates indicates
-     *                                unlimited capacity
+*                                detect based on the serviceInstance, -1 indicates indicates
+     * @param metrics
      */
-    NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock, int numSchedulableTasksConf) {
+    NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock,
+        int numSchedulableTasksConf, final LlapTaskSchedulerMetrics metrics) {
       Preconditions.checkArgument(numSchedulableTasksConf >= -1, "NumSchedulableTasks must be >=-1");
       this.serviceInstance = serviceInstance;
       this.blacklistConf = blacklistConf;
       this.clock = clock;
+      this.metrics = metrics;
 
       if (numSchedulableTasksConf == 0) {
         int pendingQueueuCapacity = 0;
@@ -1234,6 +1301,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       } else {
         this.numSchedulableTasks = numSchedulableTasksConf;
       }
+      if (metrics != null) {
+        metrics.incrSchedulableTasksCount(numSchedulableTasks);
+      }
       LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks);
     }
 
@@ -1275,17 +1345,33 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
     void registerTaskScheduled() {
       numScheduledTasks++;
+      if (metrics != null) {
+        metrics.incrRunningTasksCount();
+        metrics.decrSchedulableTasksCount();
+      }
     }
 
     void registerTaskSuccess() {
       numSuccessfulTasks++;
       numScheduledTasks--;
+      if (metrics != null) {
+        metrics.incrSuccessfulTasksCount();
+        metrics.decrRunningTasksCount();
+        metrics.incrSchedulableTasksCount();
+      }
     }
 
     void registerUnsuccessfulTaskEnd(boolean wasPreempted) {
       numScheduledTasks--;
+      if (metrics != null) {
+        metrics.decrRunningTasksCount();
+        metrics.incrSchedulableTasksCount();
+      }
       if (wasPreempted) {
         numPreemptedTasks++;
+        if (metrics != null) {
+          metrics.incrPreemptedTasksCount();
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java
new file mode 100644
index 0000000..c190be8
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.tezplugins.metrics;
+
+import org.apache.hadoop.metrics2.MetricsInfo;
+
+import com.google.common.base.Objects;
+
+/**
+ * Metrics information for llap task scheduler.
+ */
+public enum LlapTaskSchedulerInfo implements MetricsInfo {
+  SchedulerMetrics("Llap task scheduler related metrics"),
+  SchedulerClusterNodeCount("Number of nodes in the cluster"),
+  SchedulerExecutorsPerInstance("Total number of executor threads per node"),
+  SchedulerMemoryPerInstance("Total memory for executors per node in bytes"),
+  SchedulerCpuCoresPerInstance("Total CPU vCores per node"),
+  SchedulerDisabledNodeCount("Number of nodes disabled temporarily"),
+  SchedulerPendingTaskCount("Number of pending tasks"),
+  SchedulerSchedulableTaskCount("Current slots available for scheduling tasks"),
+  SchedulerSuccessfulTaskCount("Total number of successful tasks"),
+  SchedulerRunningTaskCount("Total number of running tasks"),
+  SchedulerPendingPreemptionTaskCount("Total number of tasks pending for pre-emption"),
+  SchedulerPreemptedTaskCount("Total number of tasks pre-empted"),
+  SchedulerCompletedDagCount("Number of DAGs completed");
+
+  private final String desc;
+
+  LlapTaskSchedulerInfo(String desc) {
+    this.desc = desc;
+  }
+
+  @Override
+  public String description() {
+    return desc;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("name", name()).add("description", desc)
+        .toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
new file mode 100644
index 0000000..b3230e2
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.tezplugins.metrics;
+
+import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerClusterNodeCount;
+import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerCompletedDagCount;
+import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerCpuCoresPerInstance;
+import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerDisabledNodeCount;
+import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerExecutorsPerInstance;
+import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerMemoryPerInstance;
+import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerMetrics;
+import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingPreemptionTaskCount;
+import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingTaskCount;
+import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPreemptedTaskCount;
+import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerRunningTaskCount;
+import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSchedulableTaskCount;
+import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSuccessfulTaskCount;
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+
+import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
+import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+
+/**
+ * Metrics about the llap daemon task scheduler.
+ */
+@Metrics(about = "LlapDaemon Task Scheduler Metrics", context = "scheduler")
+public class LlapTaskSchedulerMetrics implements MetricsSource {
+
+  private final String name;
+  private final JvmMetrics jvmMetrics;
+  private final String sessionId;
+  private final MetricsRegistry registry;
+  @Metric
+  MutableGaugeInt numExecutors;
+  @Metric
+  MutableGaugeLong memoryPerInstance;
+  @Metric
+  MutableGaugeInt cpuCoresPerInstance;
+  @Metric
+  MutableGaugeInt clusterNodeCount;
+  @Metric
+  MutableGaugeInt disabledNodeCount;
+  @Metric
+  MutableCounterInt pendingTasksCount;
+  @Metric
+  MutableCounterInt schedulableTasksCount;
+  @Metric
+  MutableCounterInt runningTasksCount;
+  @Metric
+  MutableCounterInt successfulTasksCount;
+  @Metric
+  MutableCounterInt preemptedTasksCount;
+  @Metric
+  MutableCounterInt completedDagcount;
+  @Metric
+  MutableCounterInt pendingPreemptionTasksCount;
+
+  private LlapTaskSchedulerMetrics(String displayName, JvmMetrics jm, String sessionId) {
+    this.name = displayName;
+    this.jvmMetrics = jm;
+    this.sessionId = sessionId;
+    this.registry = new MetricsRegistry("LlapTaskSchedulerMetricsRegistry");
+    this.registry.tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME).tag(SessionId, sessionId);
+  }
+
+  public static LlapTaskSchedulerMetrics create(String displayName, String sessionId) {
+    MetricsSystem ms = LlapMetricsSystem.instance();
+    JvmMetrics jm = JvmMetrics.create(MetricsUtils.METRICS_PROCESS_NAME, sessionId, ms);
+    return ms.register(displayName, "Llap Task Scheduler Metrics",
+        new LlapTaskSchedulerMetrics(displayName, jm, sessionId));
+  }
+
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean b) {
+    MetricsRecordBuilder rb = collector.addRecord(SchedulerMetrics)
+        .setContext("scheduler")
+        .tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME)
+        .tag(SessionId, sessionId);
+    getTaskSchedulerStats(rb);
+  }
+
+  public void setNumExecutors(int value) {
+    numExecutors.set(value);
+  }
+
+  public void setMemoryPerInstance(long value) {
+    memoryPerInstance.set(value);
+  }
+
+  public void setCpuCoresPerInstance(int value) {
+    cpuCoresPerInstance.set(value);
+  }
+
+  public void setClusterNodeCount(int value) {
+    clusterNodeCount.set(value);
+  }
+
+  public void setDisabledNodeCount(int value) {
+    disabledNodeCount.set(value);
+  }
+
+  public void incrPendingTasksCount() {
+    pendingTasksCount.incr();
+  }
+
+  public void decrPendingTasksCount() {
+    pendingTasksCount.incr(-1);
+  }
+
+  public void incrSchedulableTasksCount(int delta) {
+    schedulableTasksCount.incr(delta);
+  }
+
+  public void incrSchedulableTasksCount() {
+    schedulableTasksCount.incr();
+  }
+
+  public void decrSchedulableTasksCount() {
+    schedulableTasksCount.incr(-1);
+  }
+
+  public void incrSuccessfulTasksCount() {
+    successfulTasksCount.incr();
+  }
+
+  public void incrRunningTasksCount() {
+    runningTasksCount.incr();
+  }
+
+  public void decrRunningTasksCount() {
+    runningTasksCount.incr(-1);
+  }
+
+  public void incrPreemptedTasksCount() {
+    preemptedTasksCount.incr();
+  }
+
+  public void incrCompletedDagCount() {
+    completedDagcount.incr();
+  }
+
+  public void incrPendingPreemptionTasksCount() {
+    pendingPreemptionTasksCount.incr();
+  }
+
+  public void decrPendingPreemptionTasksCount() {
+    pendingPreemptionTasksCount.incr(-1);
+  }
+
+  private void getTaskSchedulerStats(MetricsRecordBuilder rb) {
+    rb.addGauge(SchedulerClusterNodeCount, clusterNodeCount.value())
+        .addGauge(SchedulerExecutorsPerInstance, numExecutors.value())
+        .addGauge(SchedulerMemoryPerInstance, memoryPerInstance.value())
+        .addGauge(SchedulerCpuCoresPerInstance, cpuCoresPerInstance.value())
+        .addGauge(SchedulerDisabledNodeCount, disabledNodeCount.value())
+        .addCounter(SchedulerPendingTaskCount, pendingTasksCount.value())
+        .addCounter(SchedulerSchedulableTaskCount, schedulableTasksCount.value())
+        .addCounter(SchedulerRunningTaskCount, runningTasksCount.value())
+        .addCounter(SchedulerSuccessfulTaskCount, successfulTasksCount.value())
+        .addCounter(SchedulerPendingPreemptionTaskCount, pendingPreemptionTasksCount.value())
+        .addCounter(SchedulerPreemptedTaskCount, preemptedTasksCount.value())
+        .addCounter(SchedulerCompletedDagCount, completedDagcount.value());
+  }
+
+  public JvmMetrics getJvmMetrics() {
+    return jvmMetrics;
+  }
+
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ebcd938/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index 36d8ffd..b2cd55e 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -627,7 +627,7 @@ public class TestLlapTaskSchedulerService {
 
     public LlapTaskSchedulerServiceForTest(
         TaskSchedulerContext appClient, Clock clock) {
-      super(appClient, clock);
+      super(appClient, clock, false);
     }
 
     @Override