You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/03/28 21:49:02 UTC

hive git commit: HIVE-18971 : add HS2 WM metrics for use in Grafana and such (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master ad7652e34 -> cf6b63eea


HIVE-18971 : add HS2 WM metrics for use in Grafana and such (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cf6b63ee
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cf6b63ee
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cf6b63ee

Branch: refs/heads/master
Commit: cf6b63eeaf3f264fdc11899274b0120f42f1c694
Parents: ad7652e
Author: sergey <se...@apache.org>
Authored: Wed Mar 28 14:48:53 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Mar 28 14:48:53 2018 -0700

----------------------------------------------------------------------
 .../hive/common/metrics/LegacyMetrics.java      |   5 +
 .../hive/common/metrics/common/Metrics.java     |  10 +-
 .../metrics/metrics2/CodahaleMetrics.java       |  16 ++
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 .../hive/llap/io/api/impl/LlapRecordReader.java |   2 +-
 .../ql/exec/tez/GuaranteedTasksAllocator.java   |  11 +-
 .../ql/exec/tez/QueryAllocationManager.java     |   8 +-
 .../hadoop/hive/ql/exec/tez/WmPoolMetrics.java  | 214 +++++++++++++++++++
 .../hive/ql/exec/tez/WorkloadManager.java       |  94 ++++++--
 .../exec/tez/monitoring/TezProgressMonitor.java |   1 +
 .../hive/ql/exec/tez/TestWorkloadManager.java   |   8 +-
 11 files changed, 351 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
index effe26b..d05c728 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
@@ -226,6 +226,11 @@ public class LegacyMetrics implements Metrics {
   }
 
   @Override
+  public void removeGauge(String name) {
+    //This implementation completely and exhaustively reverses the addGauge method above.
+  }
+
+  @Override
   public void addRatio(String name, MetricsVariable<Integer> numerator,
                        MetricsVariable<Integer> denominator) {
     //Not implemented

http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
index 88c513b..99d3e57 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
@@ -92,7 +92,15 @@ public interface Metrics {
    * @param name name of gauge
    * @param variable variable to track.
    */
-  public void addGauge(String name, final MetricsVariable variable);
+  public void addGauge(String name, final MetricsVariable<?> variable);
+
+
+  /**
+   * Removed the gauge added by addGauge.
+   * @param name name of gauge
+   */
+  public void removeGauge(String name);
+
 
   /**
    * Add a ratio metric to track the correlation between two variables

http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
index a43b09d..4f35a6d 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
@@ -294,6 +294,21 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
     addGaugeInternal(name, gauge);
   }
 
+
+  @Override
+  public void removeGauge(String name) {
+    try {
+      gaugesLock.lock();
+      gauges.remove(name);
+      // Metrics throws an Exception if we don't do this when the key already exists
+      if (metricRegistry.getGauges().containsKey(name)) {
+        metricRegistry.remove(name);
+      }
+    } finally {
+      gaugesLock.unlock();
+    }
+  }
+
   @Override
   public void addRatio(String name, MetricsVariable<Integer> numerator,
                            MetricsVariable<Integer> denominator) {
@@ -409,6 +424,7 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
         throw new IllegalArgumentException(e);
       }
       try {
+        // Note: Hadoop metric reporter does not support tags. We create a single reporter for all metrics.
         Constructor constructor = name.getConstructor(MetricRegistry.class, HiveConf.class);
         CodahaleReporter reporter = (CodahaleReporter) constructor.newInstance(metricRegistry, conf);
         reporter.start();

http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a6866e7..c2c3991 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2486,6 +2486,8 @@ public class HiveConf extends Configuration {
         "Applies when a user specifies a target WM pool in the JDBC connection string. If\n" +
         "false, the user can only specify a pool he is mapped to (e.g. make a choice among\n" +
         "multiple group mappings); if true, the user can specify any existing pool."),
+    HIVE_SERVER2_WM_POOL_METRICS("hive.server2.wm.pool.metrics", true,
+        "Whether per-pool WM metrics should be enabled."),
     HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT("hive.server2.tez.wm.am.registry.timeout", "30s",
         new TimeValidator(TimeUnit.SECONDS),
         "The timeout for AM registry registration, after which (on attempting to use the\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 3a2c19a..7451ea4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -476,7 +476,7 @@ class LlapRecordReader
           isClosed, isInterrupted, pendingError.get(), queue.size());
     }
     LlapIoImpl.LOG.info("Maximum queue length observed " + maxQueueSize);
-    LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
+    LlapIoImpl.LOG.info("Llap counters: {}" , counters); // This is where counters are logged!
     feedback.stop();
     isClosed = true;
     rethrowErrorIfAny(pendingError.get());

http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index a52928c..d3b4e07 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -102,13 +102,20 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
   }
 
   @Override
-  public void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessionsToUpdate) {
+  public int translateAllocationToCpus(double allocation) {
+    // Do not make a remote call under any circumstances - this is supposed to be async.
+    return (int)Math.round(getExecutorCount(false) * allocation);
+  }
+
+  @Override
+  public int updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessionsToUpdate) {
     // Do not make a remote call under any circumstances - this is supposed to be async.
     int totalCount = getExecutorCount(false);
     int totalToDistribute = -1;
     if (totalMaxAlloc != null) {
       totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc);
     }
+    int totalDistributed = 0;
     double lastDelta = 0;
     for (int i = 0; i < sessionsToUpdate.size(); ++i) {
       WmTezSession session = sessionsToUpdate.get(i);
@@ -139,8 +146,10 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
         totalToDistribute -= intAlloc;
       }
       // This will only send update if it's necessary.
+      totalDistributed += intAlloc;
       updateSessionAsync(session, intAlloc);
     }
+    return totalDistributed;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
index 9885ce7..32702c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
@@ -32,8 +32,14 @@ interface QueryAllocationManager {
    *                      avoid various artifacts, esp. with small numbers and double weirdness.
    *                      Null means the total is unknown.
    * @param sessions Sessions to update based on their allocation fraction.
+   * @return The number of executors/cpus allocated.
    */
-  void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions);
+  int updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions);
+
+  /**
+   * @return the number of CPUs equivalent to percentage allocation, for information purposes.
+   */
+  int translateAllocationToCpus(double allocation);
 
   /**
    * Sets a callback to be invoked on cluster changes relevant to resource allocation.

http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmPoolMetrics.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmPoolMetrics.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmPoolMetrics.java
new file mode 100644
index 0000000..19b035e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmPoolMetrics.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
+import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.impl.MsInfo;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableMetric;
+
+/**
+ * A wrapper for metrics for single WM pool. This outputs metrics both to Codahale and standard
+ * Hadoop metrics, in parallel. The codahale output is prefixed with pool name and is mostly
+ * for the JMX view, or to look at when Hadoop metrics are not set up. Hadoop metrics are output
+ * because they can be tagged properly rather than prefixed, so they are better for dashboards.
+ */
+public class WmPoolMetrics implements MetricsSource {
+  private final String poolName, sourceName;
+  private MetricsSystem ms;
+  @SuppressWarnings("unused") // Metrics system will get this via reflection 0_o
+  private final MetricsRegistry registry;
+
+  // Codahale. We just include the pool name in the counter name.
+  private List<String> codahaleGaugeNames;
+  private Map<String, MutableMetric> allMetrics;
+
+  @Metric("Number of guaranteed cluster executors given to queries")
+  MutableGaugeInt numExecutors;
+  @Metric("Number of guaranteed cluster executors allocated")
+  MutableGaugeInt numExecutorsMax;
+  @Metric("Number of parallel queries allowed to run")
+  MutableGaugeInt numParallelQueries;
+  @Metric("Number of queries running")
+  MutableCounterInt numRunningQueries;
+  @Metric("Number of queries queued")
+  MutableCounterInt numQueuedQueries;
+
+  // TODO: these would need to be propagated from AM via progress.
+  // @Metric("Number of allocated guaranteed executors in use"),
+  // @Metric("Number of speculative executors in use")
+
+  public WmPoolMetrics(String poolName, MetricsSystem ms) {
+    this.poolName = poolName;
+    this.sourceName = "WmPoolMetrics." + poolName;
+    this.ms = ms;
+
+    this.registry = new MetricsRegistry(sourceName);
+  }
+
+
+  public void initAfterRegister() {
+    // Make sure we capture the same metrics as Hadoop2 metrics system, via annotations.
+    if (allMetrics != null) return;
+    allMetrics = new HashMap<>();
+    for (Field field : this.getClass().getDeclaredFields()) {
+      for (Annotation annotation : field.getAnnotations()) {
+        if (!(annotation instanceof Metric)) continue;
+        try {
+          field.setAccessible(true);
+          allMetrics.put(field.getName(), (MutableMetric) field.get(this));
+        } catch (IllegalAccessException ex) {
+          break; // Not expected, access by the same class.
+        }
+        break;
+      }
+    }
+
+    // Set up codahale if enabled; we cannot tag the values so just prefix them for the JMX view.
+    Metrics chMetrics = MetricsFactory.getInstance();
+    if (!(chMetrics instanceof CodahaleMetrics)) return;
+
+    List<String> codahaleNames = new ArrayList<>();
+    for (Map.Entry<String, MutableMetric> e : allMetrics.entrySet()) {
+      MutableMetric metric = e.getValue();
+      MetricsVariable<?> var = null;
+      if (metric instanceof MutableCounterInt) {
+        var = new CodahaleCounterWrapper((MutableCounterInt) metric);
+      } else if (metric instanceof MutableGaugeInt) {
+        var = new CodahaleGaugeWrapper((MutableGaugeInt) metric);
+      }
+      if (var == null) continue; // Unexpected metric type.
+      String name = "WM_" + poolName + "_" + e.getKey();
+      codahaleNames.add(name);
+      chMetrics.addGauge(name, var);
+    }
+    this.codahaleGaugeNames = codahaleNames;
+  }
+
+
+  public void setParallelQueries(int size) {
+    numParallelQueries.set(size);
+  }
+
+  public void setExecutors(int allocation) {
+    numExecutors.set(allocation);
+  }
+
+  public void setMaxExecutors(int allocation) {
+    numExecutorsMax.set(allocation);
+  }
+
+  public void addQueuedQuery() {
+    numQueuedQueries.incr();
+  }
+
+  public void addRunningQuery() {
+    numRunningQueries.incr();
+  }
+
+  public void removeQueuedQueries(int num) {
+    numQueuedQueries.incr(-num);
+  }
+
+  public void removeRunningQueries(int num) {
+    numRunningQueries.incr(-num);
+  }
+
+  public void moveQueuedToRunning() {
+    numQueuedQueries.incr(-1);
+    numRunningQueries.incr();
+  }
+
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    // We could also have one metricssource for all the pools and add all the pools to the collector
+    // in its getMetrics call (as separate records). Not clear if that's supported.
+    // Also, we'd have to initialize the metrics ourselves instead of using @Metric annotation.
+    MetricsRecordBuilder rb = collector.addRecord("WmPoolMetrics." + poolName)
+        .setContext("HS2").tag(MsInfo.SessionId, poolName);
+    if (allMetrics == null) {
+      initAfterRegister(); // This happens if register calls getMetrics.
+    }
+    for (MutableMetric metric : allMetrics.values()) {
+      metric.snapshot(rb, all);
+    }
+  }
+
+  public static WmPoolMetrics create(String poolName, MetricsSystem ms) {
+    WmPoolMetrics metrics = new WmPoolMetrics(poolName, ms);
+    metrics = ms.register(metrics.sourceName, "WM " + poolName + " pool metrics", metrics);
+    metrics.initAfterRegister();
+    return metrics;
+  }
+
+  public void destroy() {
+    ms.unregisterSource(sourceName);
+    ms = null;
+    if (codahaleGaugeNames != null) {
+      Metrics metrics = MetricsFactory.getInstance();
+      for (String chgName : codahaleGaugeNames) {
+        metrics.removeGauge(chgName);
+      }
+      codahaleGaugeNames = null;
+    }
+  }
+
+  private static class CodahaleGaugeWrapper implements MetricsVariable<Integer> {
+    private final MutableGaugeInt mm;
+
+    public CodahaleGaugeWrapper(MutableGaugeInt mm) {
+      this.mm = mm;
+    }
+
+    @Override
+    public Integer getValue() {
+      return mm.value();
+    }
+  }
+
+  private static class CodahaleCounterWrapper implements MetricsVariable<Integer> {
+    private final MutableCounterInt mm;
+
+    public CodahaleCounterWrapper(MutableCounterInt mm) {
+      this.mm = mm;
+    }
+
+    @Override
+    public Integer getValue() {
+      return mm.value();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index f0e620c..96158fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -28,6 +28,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -50,6 +51,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -70,6 +72,8 @@ import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
 import org.apache.hadoop.hive.ql.wm.WmContext;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hive.common.util.Ref;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.codehaus.jackson.annotate.JsonAutoDetect;
@@ -107,8 +111,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   private final String yarnQueue;
   private final int amRegistryTimeoutMs;
   private final boolean allowAnyPool;
+  private final MetricsSystem metricsSystem;
   // Note: it's not clear that we need to track this - unlike PoolManager we don't have non-pool
-  //       sessions, so the pool itself could internally track the sessions it gave out, since
+  //       sessions, so the pool itself could internally track the ses  sions it gave out, since
   //       calling close on an unopened session is probably harmless.
   private final IdentityHashMap<TezSessionPoolSession, Boolean> openSessions =
       new IdentityHashMap<>();
@@ -216,6 +221,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       .setDaemon(true).setNameFormat("Workload management timeout thread").build());
 
     allowAnyPool = HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC);
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_POOL_METRICS)) {
+      metricsSystem = DefaultMetricsSystem.instance();
+    } else {
+      metricsSystem = null;
+    }
 
     wmThread = new Thread(() -> runWmThread(), "Workload management master");
     wmThread.setDaemon(true);
@@ -755,7 +765,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       WmEvent moveEvent = new WmEvent(WmEvent.EventType.MOVE);
       // remove from src pool
       RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
-          moveSession.srcSession, poolsToRedistribute, true);
+          moveSession.srcSession, poolsToRedistribute, true, true);
       if (rr == RemoveSessionResult.OK) {
         // check if there is capacity in dest pool, if so move else kill the session
         if (capacityAvailable(destPoolName)) {
@@ -859,7 +869,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       reuseRequest.future.setException(new AssertionError("Invalid reuse attempt"));
     }
     session.setQueryId(null);
-    return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, isReturn);
+    return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, isReturn, true);
   }
 
   private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session,
@@ -876,7 +886,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // anything. Instead, we will try to give out an existing session from the pool, and restart
     // the problematic one in background.
     String poolName = session.getPoolName();
-    RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, false);
+    // Do not update metrics, we'd immediately add the session back if we are able to remove.
+    RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
+        session, poolsToRedistribute, false, false);
     switch (rr) {
     case OK:
       // If pool didn't exist, checkAndRemoveSessionFromItsPool wouldn't have returned OK.
@@ -885,6 +897,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
           session.getWmContext(), session.extractHiveResources());
       // We have just removed the session from the same pool, so don't check concurrency here.
       pool.initializingSessions.add(sw);
+      // Do not update metrics - see above.
       sw.start();
       syncWork.toRestartInUse.add(session);
       return;
@@ -920,10 +933,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       // This session is bad, so don't allow reuse; just convert it to normal get.
       reuseRequest.sessionToReuse = null;
     }
-    // TODO: we should communicate this to the user more explicitly (use kill query API, or
-    //       add an option for bg kill checking to TezTask/monitor?
+
     // We are assuming the update-error AM is bad and just try to kill it.
-    RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, null);
+    RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
+        session, poolsToRedistribute, null, true);
     switch (rr) {
     case OK:
     case NOT_FOUND:
@@ -989,7 +1002,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         }
         PoolState state = oldPools == null ? null : oldPools.remove(fullName);
         if (state == null) {
-          state = new PoolState(fullName, qp, fraction, pool.getSchedulingPolicy());
+          state = new PoolState(fullName, qp, fraction, pool.getSchedulingPolicy(), metricsSystem);
         } else {
           // This will also take care of the queries if query parallelism changed.
           state.update(qp, fraction, syncWork, e, pool.getSchedulingPolicy());
@@ -1001,6 +1014,12 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         totalQueryParallelism += qp;
       }
     }
+    for (PoolState pool : pools.values()) {
+      if (pool.metrics != null) {
+        pool.metrics.setMaxExecutors(
+            allocationManager.translateAllocationToCpus(pool.finalFractionRemaining));
+      }
+    }
     // TODO: in the current impl, triggers are added to RP. For tez, no pool triggers (mapping between trigger name and
     // pool name) will exist which means all triggers applies to tez. For LLAP, pool triggers has to exist for attaching
     // triggers to specific pools.
@@ -1094,12 +1113,18 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       String oldPoolName = req.sessionToReuse.getPoolName();
       oldPool = pools.get(oldPoolName);
       RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
-          req.sessionToReuse, poolsToRedistribute, true);
+          req.sessionToReuse, poolsToRedistribute, true, false);
       if (rr != RemoveSessionResult.OK) {
+        if (oldPool.metrics != null) {
+          oldPool.metrics.removeRunningQueries(1);
+        }
         // Abandon the reuse attempt.
         returnSessionOnFailedReuse(req, syncWork, null);
         req.sessionToReuse = null;
       } else if (pool.getTotalActiveSessions() + pool.queue.size() >= pool.queryParallelism) {
+        if (oldPool.metrics != null) {
+          oldPool.metrics.removeRunningQueries(1);
+        }
         // One cannot simply reuse the session if there are other queries waiting; to maintain
         // fairness, we'll try to take a query slot instantly, and if that fails we'll return
         // this session back to the pool and give the user a new session later.
@@ -1113,6 +1138,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       req.sessionToReuse.setPoolName(poolName);
       req.sessionToReuse.setQueueName(yarnQueue);
       req.sessionToReuse.setQueryId(req.queryId);
+      // Do not update metrics - we didn't update on removal.
       pool.sessions.add(req.sessionToReuse);
       if (pool != oldPool) {
         poolsToRedistribute.add(poolName);
@@ -1123,6 +1149,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
 
     // Otherwise, queue the session and make sure we update this pool.
     pool.queue.addLast(req);
+    if (pool.metrics != null) {
+      pool.metrics.addQueuedQuery();
+    }
     poolsToRedistribute.add(poolName);
   }
 
@@ -1134,7 +1163,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
 
     // 1. First, start the queries from the queue.
     int queriesToStart = Math.min(pool.queue.size(),
-      pool.queryParallelism - pool.getTotalActiveSessions());
+        pool.queryParallelism - pool.getTotalActiveSessions());
 
     if (queriesToStart > 0) {
       LOG.info("Starting {} queries in pool {}", queriesToStart, pool);
@@ -1145,6 +1174,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
     for (int i = 0; i < queriesToStart; ++i) {
       GetRequest queueReq = pool.queue.pollFirst();
+      if (pool.metrics != null) {
+        pool.metrics.moveQueuedToRunning();
+      }
       assert queueReq.sessionToReuse == null;
       // Note that in theory, we are guaranteed to have a session waiting for us here, but
       // the expiration, failures, etc. may cause one to be missing pending restart.
@@ -1170,7 +1202,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // logic to be consistent between all the separate calls in one master thread processing round.
     // Note: If allocation manager does not have cluster state, it won't update anything. When the
     //       cluster state changes, it will notify us, and we'd update the queries again.
-    allocationManager.updateSessionsAsync(totalAlloc, pool.sessions);
+    int cpusAllocated = allocationManager.updateSessionsAsync(totalAlloc, pool.sessions);
+    if (pool.metrics != null) {
+      pool.metrics.setExecutors(cpusAllocated);
+      if (cpusAllocated > 0) {
+        // Update max executors now that cluster info is definitely available.
+        pool.metrics.setMaxExecutors(allocationManager.translateAllocationToCpus(totalAlloc));
+      }
+    }
   }
 
   private void returnSessionOnFailedReuse(
@@ -1181,7 +1220,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     session.setQueryId(null);
     if (poolsToRedistribute != null) {
       RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
-          session, poolsToRedistribute, true);
+          session, poolsToRedistribute, true, true);
       // The session cannot have been killed just now; this happens after all the kills in
       // the current iteration, so we would have cleared sessionToReuse when killing this.
       boolean isOk = (rr == RemoveSessionResult.OK);
@@ -1217,8 +1256,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
    *         thread (so we are dealing with an outdated request); null if the session should be
    *         in WM but wasn't found in the requisite pool (internal error?).
    */
-  private RemoveSessionResult checkAndRemoveSessionFromItsPool(
-      WmTezSession session, Set<String> poolsToRedistribute, Boolean isSessionOk) {
+  private RemoveSessionResult checkAndRemoveSessionFromItsPool(WmTezSession session,
+      Set<String> poolsToRedistribute, Boolean isSessionOk, boolean updateMetrics) {
     // It is possible for some request to be queued after a main thread has decided to kill this
     // session; on the next iteration, we'd be processing that request with an irrelevant session.
     if (session.isIrrelevantForWm()) {
@@ -1237,6 +1276,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       PoolState pool = pools.get(poolName);
       session.clearWm();
       if (pool != null && pool.sessions.remove(session)) {
+        if (updateMetrics && pool.metrics != null) {
+          pool.metrics.removeRunningQueries(1);
+        }
         return RemoveSessionResult.OK;
       }
     }
@@ -1255,6 +1297,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
 
     PoolState destPool = pools.get(destPoolName);
     if (destPool != null && destPool.sessions.add(session)) {
+      if (destPool.metrics != null) {
+        destPool.metrics.addRunningQuery();
+      }
       session.setPoolName(destPoolName);
       updateTriggers(session);
       poolsToRedistribute.add(destPoolName);
@@ -1675,6 +1720,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // Note: the list is expected to be a few items; if it's longer we may want an IHM.
     private final LinkedList<WmTezSession> sessions = new LinkedList<>();
     private final LinkedList<GetRequest> queue = new LinkedList<>();
+    private final WmPoolMetrics metrics;
 
     private final String fullName;
     private double finalFraction;
@@ -1684,8 +1730,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     private WMPoolSchedulingPolicy schedulingPolicy;
 
     public PoolState(String fullName, int queryParallelism, double fraction,
-        String schedulingPolicy) {
+        String schedulingPolicy, MetricsSystem ms) {
       this.fullName = fullName;
+      // TODO: this actually calls the metrics system and getMetrics - that may be expensive.
+      //       For now it looks like it should be ok to do on WM thread.
+      this.metrics = ms == null ? null : WmPoolMetrics.create(fullName, ms);
       update(queryParallelism, fraction, null, null, schedulingPolicy);
     }
 
@@ -1697,6 +1746,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         WmThreadSyncWork syncWork, EventState e, String schedulingPolicy) {
       this.finalFraction = this.finalFractionRemaining = fraction;
       this.queryParallelism = queryParallelism;
+      if (metrics != null) {
+        metrics.setParallelQueries(queryParallelism);
+      }
       try {
         this.schedulingPolicy = MetaStoreUtils.parseSchedulingPolicy(schedulingPolicy);
       } catch (IllegalArgumentException ex) {
@@ -1716,6 +1768,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       // We will requeue, and not kill, the queries that are not running yet.
       // Insert them all before the get requests from this iteration.
       GetRequest req;
+      if (metrics != null) {
+        metrics.removeQueuedQueries(queue.size());
+      }
+
       while ((req = queue.pollLast()) != null) {
         e.getRequests.addFirst(req);
       }
@@ -1727,6 +1783,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       // All the pending get requests should just be requeued elsewhere.
       // Note that we never queue session reuse so sessionToReuse would be null.
       globalQueue.addAll(0, queue);
+      if (metrics != null) {
+        metrics.removeQueuedQueries(queue.size());
+        metrics.destroy();
+      }
       queue.clear();
     }
 
@@ -1774,6 +1834,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     private void extractAllSessionsToKill(String killReason,
         IdentityHashMap<WmTezSession, GetRequest> toReuse,
         WmThreadSyncWork syncWork) {
+      int totalCount = sessions.size() + initializingSessions.size();
       for (WmTezSession sessionToKill : sessions) {
         resetRemovedSessionToKill(syncWork.toKillQuery,
           new KillQueryContext(sessionToKill, killReason), toReuse);
@@ -1791,6 +1852,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
           new KillQueryContext(sessionToKill, killReason), toReuse);
       }
       initializingSessions.clear();
+      if (metrics != null) {
+        metrics.removeRunningQueries(totalCount);
+      }
     }
 
     public void setTriggers(final LinkedList<Trigger> triggers) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
index a14cdb6..b0c1659 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
@@ -94,6 +94,7 @@ class TezProgressMonitor implements ProgressMonitor {
         if (progress != null) {
           // Map 1 .......... container  SUCCEEDED      7          7        0        0       0       0
 
+          // TODO: can we pass custom things thru the progress?
           results.add(
             Arrays.asList(
               getNameWithProgress(vertexName, progress.succeededTaskCount, progress.totalTaskCount),

http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 20a5947..fb32e90 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -107,8 +107,9 @@ public class TestWorkloadManager {
     }
 
     @Override
-    public void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions) {
+    public int updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions) {
       isCalled = true;
+      return 0;
     }
     
     @Override
@@ -123,6 +124,11 @@ public class TestWorkloadManager {
     @Override
     public void setClusterChangedCallback(Runnable clusterChangedCallback) {
     }
+
+    @Override
+    public int translateAllocationToCpus(double allocation) {
+      return 0;
+    }
   }
 
   public static WMResourcePlan plan() {