You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/03/28 21:49:02 UTC
hive git commit: HIVE-18971 : add HS2 WM metrics for use in Grafana
and such (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master ad7652e34 -> cf6b63eea
HIVE-18971 : add HS2 WM metrics for use in Grafana and such (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cf6b63ee
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cf6b63ee
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cf6b63ee
Branch: refs/heads/master
Commit: cf6b63eeaf3f264fdc11899274b0120f42f1c694
Parents: ad7652e
Author: sergey <se...@apache.org>
Authored: Wed Mar 28 14:48:53 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Mar 28 14:48:53 2018 -0700
----------------------------------------------------------------------
.../hive/common/metrics/LegacyMetrics.java | 5 +
.../hive/common/metrics/common/Metrics.java | 10 +-
.../metrics/metrics2/CodahaleMetrics.java | 16 ++
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../hive/llap/io/api/impl/LlapRecordReader.java | 2 +-
.../ql/exec/tez/GuaranteedTasksAllocator.java | 11 +-
.../ql/exec/tez/QueryAllocationManager.java | 8 +-
.../hadoop/hive/ql/exec/tez/WmPoolMetrics.java | 214 +++++++++++++++++++
.../hive/ql/exec/tez/WorkloadManager.java | 94 ++++++--
.../exec/tez/monitoring/TezProgressMonitor.java | 1 +
.../hive/ql/exec/tez/TestWorkloadManager.java | 8 +-
11 files changed, 351 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
index effe26b..d05c728 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
@@ -226,6 +226,11 @@ public class LegacyMetrics implements Metrics {
}
@Override
+ public void removeGauge(String name) {
+ //This implementation completely and exhaustively reverses the addGauge method above.
+ }
+
+ @Override
public void addRatio(String name, MetricsVariable<Integer> numerator,
MetricsVariable<Integer> denominator) {
//Not implemented
http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
index 88c513b..99d3e57 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
@@ -92,7 +92,15 @@ public interface Metrics {
* @param name name of gauge
* @param variable variable to track.
*/
- public void addGauge(String name, final MetricsVariable variable);
+ public void addGauge(String name, final MetricsVariable<?> variable);
+
+
+ /**
+ * Removed the gauge added by addGauge.
+ * @param name name of gauge
+ */
+ public void removeGauge(String name);
+
/**
* Add a ratio metric to track the correlation between two variables
http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
index a43b09d..4f35a6d 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
@@ -294,6 +294,21 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
addGaugeInternal(name, gauge);
}
+
+ @Override
+ public void removeGauge(String name) {
+ try {
+ gaugesLock.lock();
+ gauges.remove(name);
+ // Metrics throws an Exception if we don't do this when the key already exists
+ if (metricRegistry.getGauges().containsKey(name)) {
+ metricRegistry.remove(name);
+ }
+ } finally {
+ gaugesLock.unlock();
+ }
+ }
+
@Override
public void addRatio(String name, MetricsVariable<Integer> numerator,
MetricsVariable<Integer> denominator) {
@@ -409,6 +424,7 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
throw new IllegalArgumentException(e);
}
try {
+ // Note: Hadoop metric reporter does not support tags. We create a single reporter for all metrics.
Constructor constructor = name.getConstructor(MetricRegistry.class, HiveConf.class);
CodahaleReporter reporter = (CodahaleReporter) constructor.newInstance(metricRegistry, conf);
reporter.start();
http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a6866e7..c2c3991 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2486,6 +2486,8 @@ public class HiveConf extends Configuration {
"Applies when a user specifies a target WM pool in the JDBC connection string. If\n" +
"false, the user can only specify a pool he is mapped to (e.g. make a choice among\n" +
"multiple group mappings); if true, the user can specify any existing pool."),
+ HIVE_SERVER2_WM_POOL_METRICS("hive.server2.wm.pool.metrics", true,
+ "Whether per-pool WM metrics should be enabled."),
HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT("hive.server2.tez.wm.am.registry.timeout", "30s",
new TimeValidator(TimeUnit.SECONDS),
"The timeout for AM registry registration, after which (on attempting to use the\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 3a2c19a..7451ea4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -476,7 +476,7 @@ class LlapRecordReader
isClosed, isInterrupted, pendingError.get(), queue.size());
}
LlapIoImpl.LOG.info("Maximum queue length observed " + maxQueueSize);
- LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
+ LlapIoImpl.LOG.info("Llap counters: {}" , counters); // This is where counters are logged!
feedback.stop();
isClosed = true;
rethrowErrorIfAny(pendingError.get());
http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index a52928c..d3b4e07 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -102,13 +102,20 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
}
@Override
- public void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessionsToUpdate) {
+ public int translateAllocationToCpus(double allocation) {
+ // Do not make a remote call under any circumstances - this is supposed to be async.
+ return (int)Math.round(getExecutorCount(false) * allocation);
+ }
+
+ @Override
+ public int updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessionsToUpdate) {
// Do not make a remote call under any circumstances - this is supposed to be async.
int totalCount = getExecutorCount(false);
int totalToDistribute = -1;
if (totalMaxAlloc != null) {
totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc);
}
+ int totalDistributed = 0;
double lastDelta = 0;
for (int i = 0; i < sessionsToUpdate.size(); ++i) {
WmTezSession session = sessionsToUpdate.get(i);
@@ -139,8 +146,10 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
totalToDistribute -= intAlloc;
}
// This will only send update if it's necessary.
+ totalDistributed += intAlloc;
updateSessionAsync(session, intAlloc);
}
+ return totalDistributed;
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
index 9885ce7..32702c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
@@ -32,8 +32,14 @@ interface QueryAllocationManager {
* avoid various artifacts, esp. with small numbers and double weirdness.
* Null means the total is unknown.
* @param sessions Sessions to update based on their allocation fraction.
+ * @return The number of executors/cpus allocated.
*/
- void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions);
+ int updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions);
+
+ /**
+ * @return the number of CPUs equivalent to percentage allocation, for information purposes.
+ */
+ int translateAllocationToCpus(double allocation);
/**
* Sets a callback to be invoked on cluster changes relevant to resource allocation.
http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmPoolMetrics.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmPoolMetrics.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmPoolMetrics.java
new file mode 100644
index 0000000..19b035e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmPoolMetrics.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
+import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.impl.MsInfo;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableMetric;
+
+/**
+ * A wrapper for metrics for single WM pool. This outputs metrics both to Codahale and standard
+ * Hadoop metrics, in parallel. The codahale output is prefixed with pool name and is mostly
+ * for the JMX view, or to look at when Hadoop metrics are not set up. Hadoop metrics are output
+ * because they can be tagged properly rather than prefixed, so they are better for dashboards.
+ */
+public class WmPoolMetrics implements MetricsSource {
+ private final String poolName, sourceName;
+ private MetricsSystem ms;
+ @SuppressWarnings("unused") // Metrics system will get this via reflection 0_o
+ private final MetricsRegistry registry;
+
+ // Codahale. We just include the pool name in the counter name.
+ private List<String> codahaleGaugeNames;
+ private Map<String, MutableMetric> allMetrics;
+
+ @Metric("Number of guaranteed cluster executors given to queries")
+ MutableGaugeInt numExecutors;
+ @Metric("Number of guaranteed cluster executors allocated")
+ MutableGaugeInt numExecutorsMax;
+ @Metric("Number of parallel queries allowed to run")
+ MutableGaugeInt numParallelQueries;
+ @Metric("Number of queries running")
+ MutableCounterInt numRunningQueries;
+ @Metric("Number of queries queued")
+ MutableCounterInt numQueuedQueries;
+
+ // TODO: these would need to be propagated from AM via progress.
+ // @Metric("Number of allocated guaranteed executors in use"),
+ // @Metric("Number of speculative executors in use")
+
+ public WmPoolMetrics(String poolName, MetricsSystem ms) {
+ this.poolName = poolName;
+ this.sourceName = "WmPoolMetrics." + poolName;
+ this.ms = ms;
+
+ this.registry = new MetricsRegistry(sourceName);
+ }
+
+
+ public void initAfterRegister() {
+ // Make sure we capture the same metrics as Hadoop2 metrics system, via annotations.
+ if (allMetrics != null) return;
+ allMetrics = new HashMap<>();
+ for (Field field : this.getClass().getDeclaredFields()) {
+ for (Annotation annotation : field.getAnnotations()) {
+ if (!(annotation instanceof Metric)) continue;
+ try {
+ field.setAccessible(true);
+ allMetrics.put(field.getName(), (MutableMetric) field.get(this));
+ } catch (IllegalAccessException ex) {
+ break; // Not expected, access by the same class.
+ }
+ break;
+ }
+ }
+
+ // Set up codahale if enabled; we cannot tag the values so just prefix them for the JMX view.
+ Metrics chMetrics = MetricsFactory.getInstance();
+ if (!(chMetrics instanceof CodahaleMetrics)) return;
+
+ List<String> codahaleNames = new ArrayList<>();
+ for (Map.Entry<String, MutableMetric> e : allMetrics.entrySet()) {
+ MutableMetric metric = e.getValue();
+ MetricsVariable<?> var = null;
+ if (metric instanceof MutableCounterInt) {
+ var = new CodahaleCounterWrapper((MutableCounterInt) metric);
+ } else if (metric instanceof MutableGaugeInt) {
+ var = new CodahaleGaugeWrapper((MutableGaugeInt) metric);
+ }
+ if (var == null) continue; // Unexpected metric type.
+ String name = "WM_" + poolName + "_" + e.getKey();
+ codahaleNames.add(name);
+ chMetrics.addGauge(name, var);
+ }
+ this.codahaleGaugeNames = codahaleNames;
+ }
+
+
+ public void setParallelQueries(int size) {
+ numParallelQueries.set(size);
+ }
+
+ public void setExecutors(int allocation) {
+ numExecutors.set(allocation);
+ }
+
+ public void setMaxExecutors(int allocation) {
+ numExecutorsMax.set(allocation);
+ }
+
+ public void addQueuedQuery() {
+ numQueuedQueries.incr();
+ }
+
+ public void addRunningQuery() {
+ numRunningQueries.incr();
+ }
+
+ public void removeQueuedQueries(int num) {
+ numQueuedQueries.incr(-num);
+ }
+
+ public void removeRunningQueries(int num) {
+ numRunningQueries.incr(-num);
+ }
+
+ public void moveQueuedToRunning() {
+ numQueuedQueries.incr(-1);
+ numRunningQueries.incr();
+ }
+
+ @Override
+ public void getMetrics(MetricsCollector collector, boolean all) {
+ // We could also have one metricssource for all the pools and add all the pools to the collector
+ // in its getMetrics call (as separate records). Not clear if that's supported.
+ // Also, we'd have to initialize the metrics ourselves instead of using @Metric annotation.
+ MetricsRecordBuilder rb = collector.addRecord("WmPoolMetrics." + poolName)
+ .setContext("HS2").tag(MsInfo.SessionId, poolName);
+ if (allMetrics == null) {
+ initAfterRegister(); // This happens if register calls getMetrics.
+ }
+ for (MutableMetric metric : allMetrics.values()) {
+ metric.snapshot(rb, all);
+ }
+ }
+
+ public static WmPoolMetrics create(String poolName, MetricsSystem ms) {
+ WmPoolMetrics metrics = new WmPoolMetrics(poolName, ms);
+ metrics = ms.register(metrics.sourceName, "WM " + poolName + " pool metrics", metrics);
+ metrics.initAfterRegister();
+ return metrics;
+ }
+
+ public void destroy() {
+ ms.unregisterSource(sourceName);
+ ms = null;
+ if (codahaleGaugeNames != null) {
+ Metrics metrics = MetricsFactory.getInstance();
+ for (String chgName : codahaleGaugeNames) {
+ metrics.removeGauge(chgName);
+ }
+ codahaleGaugeNames = null;
+ }
+ }
+
+ private static class CodahaleGaugeWrapper implements MetricsVariable<Integer> {
+ private final MutableGaugeInt mm;
+
+ public CodahaleGaugeWrapper(MutableGaugeInt mm) {
+ this.mm = mm;
+ }
+
+ @Override
+ public Integer getValue() {
+ return mm.value();
+ }
+ }
+
+ private static class CodahaleCounterWrapper implements MetricsVariable<Integer> {
+ private final MutableCounterInt mm;
+
+ public CodahaleCounterWrapper(MutableCounterInt mm) {
+ this.mm = mm;
+ }
+
+ @Override
+ public Integer getValue() {
+ return mm.value();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index f0e620c..96158fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -28,6 +28,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -50,6 +51,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -70,6 +72,8 @@ import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
import org.apache.hadoop.hive.ql.wm.WmContext;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hive.common.util.Ref;
import org.apache.tez.dag.api.TezConfiguration;
import org.codehaus.jackson.annotate.JsonAutoDetect;
@@ -107,8 +111,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private final String yarnQueue;
private final int amRegistryTimeoutMs;
private final boolean allowAnyPool;
+ private final MetricsSystem metricsSystem;
// Note: it's not clear that we need to track this - unlike PoolManager we don't have non-pool
- // sessions, so the pool itself could internally track the sessions it gave out, since
+ // sessions, so the pool itself could internally track the ses sions it gave out, since
// calling close on an unopened session is probably harmless.
private final IdentityHashMap<TezSessionPoolSession, Boolean> openSessions =
new IdentityHashMap<>();
@@ -216,6 +221,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
.setDaemon(true).setNameFormat("Workload management timeout thread").build());
allowAnyPool = HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC);
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_POOL_METRICS)) {
+ metricsSystem = DefaultMetricsSystem.instance();
+ } else {
+ metricsSystem = null;
+ }
wmThread = new Thread(() -> runWmThread(), "Workload management master");
wmThread.setDaemon(true);
@@ -755,7 +765,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
WmEvent moveEvent = new WmEvent(WmEvent.EventType.MOVE);
// remove from src pool
RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
- moveSession.srcSession, poolsToRedistribute, true);
+ moveSession.srcSession, poolsToRedistribute, true, true);
if (rr == RemoveSessionResult.OK) {
// check if there is capacity in dest pool, if so move else kill the session
if (capacityAvailable(destPoolName)) {
@@ -859,7 +869,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
reuseRequest.future.setException(new AssertionError("Invalid reuse attempt"));
}
session.setQueryId(null);
- return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, isReturn);
+ return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, isReturn, true);
}
private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session,
@@ -876,7 +886,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// anything. Instead, we will try to give out an existing session from the pool, and restart
// the problematic one in background.
String poolName = session.getPoolName();
- RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, false);
+ // Do not update metrics, we'd immediately add the session back if we are able to remove.
+ RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
+ session, poolsToRedistribute, false, false);
switch (rr) {
case OK:
// If pool didn't exist, checkAndRemoveSessionFromItsPool wouldn't have returned OK.
@@ -885,6 +897,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
session.getWmContext(), session.extractHiveResources());
// We have just removed the session from the same pool, so don't check concurrency here.
pool.initializingSessions.add(sw);
+ // Do not update metrics - see above.
sw.start();
syncWork.toRestartInUse.add(session);
return;
@@ -920,10 +933,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// This session is bad, so don't allow reuse; just convert it to normal get.
reuseRequest.sessionToReuse = null;
}
- // TODO: we should communicate this to the user more explicitly (use kill query API, or
- // add an option for bg kill checking to TezTask/monitor?
+
// We are assuming the update-error AM is bad and just try to kill it.
- RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, null);
+ RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
+ session, poolsToRedistribute, null, true);
switch (rr) {
case OK:
case NOT_FOUND:
@@ -989,7 +1002,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
PoolState state = oldPools == null ? null : oldPools.remove(fullName);
if (state == null) {
- state = new PoolState(fullName, qp, fraction, pool.getSchedulingPolicy());
+ state = new PoolState(fullName, qp, fraction, pool.getSchedulingPolicy(), metricsSystem);
} else {
// This will also take care of the queries if query parallelism changed.
state.update(qp, fraction, syncWork, e, pool.getSchedulingPolicy());
@@ -1001,6 +1014,12 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
totalQueryParallelism += qp;
}
}
+ for (PoolState pool : pools.values()) {
+ if (pool.metrics != null) {
+ pool.metrics.setMaxExecutors(
+ allocationManager.translateAllocationToCpus(pool.finalFractionRemaining));
+ }
+ }
// TODO: in the current impl, triggers are added to RP. For tez, no pool triggers (mapping between trigger name and
// pool name) will exist which means all triggers applies to tez. For LLAP, pool triggers has to exist for attaching
// triggers to specific pools.
@@ -1094,12 +1113,18 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
String oldPoolName = req.sessionToReuse.getPoolName();
oldPool = pools.get(oldPoolName);
RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
- req.sessionToReuse, poolsToRedistribute, true);
+ req.sessionToReuse, poolsToRedistribute, true, false);
if (rr != RemoveSessionResult.OK) {
+ if (oldPool.metrics != null) {
+ oldPool.metrics.removeRunningQueries(1);
+ }
// Abandon the reuse attempt.
returnSessionOnFailedReuse(req, syncWork, null);
req.sessionToReuse = null;
} else if (pool.getTotalActiveSessions() + pool.queue.size() >= pool.queryParallelism) {
+ if (oldPool.metrics != null) {
+ oldPool.metrics.removeRunningQueries(1);
+ }
// One cannot simply reuse the session if there are other queries waiting; to maintain
// fairness, we'll try to take a query slot instantly, and if that fails we'll return
// this session back to the pool and give the user a new session later.
@@ -1113,6 +1138,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
req.sessionToReuse.setPoolName(poolName);
req.sessionToReuse.setQueueName(yarnQueue);
req.sessionToReuse.setQueryId(req.queryId);
+ // Do not update metrics - we didn't update on removal.
pool.sessions.add(req.sessionToReuse);
if (pool != oldPool) {
poolsToRedistribute.add(poolName);
@@ -1123,6 +1149,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// Otherwise, queue the session and make sure we update this pool.
pool.queue.addLast(req);
+ if (pool.metrics != null) {
+ pool.metrics.addQueuedQuery();
+ }
poolsToRedistribute.add(poolName);
}
@@ -1134,7 +1163,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// 1. First, start the queries from the queue.
int queriesToStart = Math.min(pool.queue.size(),
- pool.queryParallelism - pool.getTotalActiveSessions());
+ pool.queryParallelism - pool.getTotalActiveSessions());
if (queriesToStart > 0) {
LOG.info("Starting {} queries in pool {}", queriesToStart, pool);
@@ -1145,6 +1174,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
for (int i = 0; i < queriesToStart; ++i) {
GetRequest queueReq = pool.queue.pollFirst();
+ if (pool.metrics != null) {
+ pool.metrics.moveQueuedToRunning();
+ }
assert queueReq.sessionToReuse == null;
// Note that in theory, we are guaranteed to have a session waiting for us here, but
// the expiration, failures, etc. may cause one to be missing pending restart.
@@ -1170,7 +1202,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// logic to be consistent between all the separate calls in one master thread processing round.
// Note: If allocation manager does not have cluster state, it won't update anything. When the
// cluster state changes, it will notify us, and we'd update the queries again.
- allocationManager.updateSessionsAsync(totalAlloc, pool.sessions);
+ int cpusAllocated = allocationManager.updateSessionsAsync(totalAlloc, pool.sessions);
+ if (pool.metrics != null) {
+ pool.metrics.setExecutors(cpusAllocated);
+ if (cpusAllocated > 0) {
+ // Update max executors now that cluster info is definitely available.
+ pool.metrics.setMaxExecutors(allocationManager.translateAllocationToCpus(totalAlloc));
+ }
+ }
}
private void returnSessionOnFailedReuse(
@@ -1181,7 +1220,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
session.setQueryId(null);
if (poolsToRedistribute != null) {
RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
- session, poolsToRedistribute, true);
+ session, poolsToRedistribute, true, true);
// The session cannot have been killed just now; this happens after all the kills in
// the current iteration, so we would have cleared sessionToReuse when killing this.
boolean isOk = (rr == RemoveSessionResult.OK);
@@ -1217,8 +1256,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
* thread (so we are dealing with an outdated request); null if the session should be
* in WM but wasn't found in the requisite pool (internal error?).
*/
- private RemoveSessionResult checkAndRemoveSessionFromItsPool(
- WmTezSession session, Set<String> poolsToRedistribute, Boolean isSessionOk) {
+ private RemoveSessionResult checkAndRemoveSessionFromItsPool(WmTezSession session,
+ Set<String> poolsToRedistribute, Boolean isSessionOk, boolean updateMetrics) {
// It is possible for some request to be queued after a main thread has decided to kill this
// session; on the next iteration, we'd be processing that request with an irrelevant session.
if (session.isIrrelevantForWm()) {
@@ -1237,6 +1276,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
PoolState pool = pools.get(poolName);
session.clearWm();
if (pool != null && pool.sessions.remove(session)) {
+ if (updateMetrics && pool.metrics != null) {
+ pool.metrics.removeRunningQueries(1);
+ }
return RemoveSessionResult.OK;
}
}
@@ -1255,6 +1297,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
PoolState destPool = pools.get(destPoolName);
if (destPool != null && destPool.sessions.add(session)) {
+ if (destPool.metrics != null) {
+ destPool.metrics.addRunningQuery();
+ }
session.setPoolName(destPoolName);
updateTriggers(session);
poolsToRedistribute.add(destPoolName);
@@ -1675,6 +1720,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// Note: the list is expected to be a few items; if it's longer we may want an IHM.
private final LinkedList<WmTezSession> sessions = new LinkedList<>();
private final LinkedList<GetRequest> queue = new LinkedList<>();
+ private final WmPoolMetrics metrics;
private final String fullName;
private double finalFraction;
@@ -1684,8 +1730,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private WMPoolSchedulingPolicy schedulingPolicy;
public PoolState(String fullName, int queryParallelism, double fraction,
- String schedulingPolicy) {
+ String schedulingPolicy, MetricsSystem ms) {
this.fullName = fullName;
+ // TODO: this actually calls the metrics system and getMetrics - that may be expensive.
+ // For now it looks like it should be ok to do on WM thread.
+ this.metrics = ms == null ? null : WmPoolMetrics.create(fullName, ms);
update(queryParallelism, fraction, null, null, schedulingPolicy);
}
@@ -1697,6 +1746,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
WmThreadSyncWork syncWork, EventState e, String schedulingPolicy) {
this.finalFraction = this.finalFractionRemaining = fraction;
this.queryParallelism = queryParallelism;
+ if (metrics != null) {
+ metrics.setParallelQueries(queryParallelism);
+ }
try {
this.schedulingPolicy = MetaStoreUtils.parseSchedulingPolicy(schedulingPolicy);
} catch (IllegalArgumentException ex) {
@@ -1716,6 +1768,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// We will requeue, and not kill, the queries that are not running yet.
// Insert them all before the get requests from this iteration.
GetRequest req;
+ if (metrics != null) {
+ metrics.removeQueuedQueries(queue.size());
+ }
+
while ((req = queue.pollLast()) != null) {
e.getRequests.addFirst(req);
}
@@ -1727,6 +1783,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// All the pending get requests should just be requeued elsewhere.
// Note that we never queue session reuse so sessionToReuse would be null.
globalQueue.addAll(0, queue);
+ if (metrics != null) {
+ metrics.removeQueuedQueries(queue.size());
+ metrics.destroy();
+ }
queue.clear();
}
@@ -1774,6 +1834,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private void extractAllSessionsToKill(String killReason,
IdentityHashMap<WmTezSession, GetRequest> toReuse,
WmThreadSyncWork syncWork) {
+ int totalCount = sessions.size() + initializingSessions.size();
for (WmTezSession sessionToKill : sessions) {
resetRemovedSessionToKill(syncWork.toKillQuery,
new KillQueryContext(sessionToKill, killReason), toReuse);
@@ -1791,6 +1852,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
new KillQueryContext(sessionToKill, killReason), toReuse);
}
initializingSessions.clear();
+ if (metrics != null) {
+ metrics.removeRunningQueries(totalCount);
+ }
}
public void setTriggers(final LinkedList<Trigger> triggers) {
http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
index a14cdb6..b0c1659 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
@@ -94,6 +94,7 @@ class TezProgressMonitor implements ProgressMonitor {
if (progress != null) {
// Map 1 .......... container SUCCEEDED 7 7 0 0 0 0
+ // TODO: can we pass custom things thru the progress?
results.add(
Arrays.asList(
getNameWithProgress(vertexName, progress.succeededTaskCount, progress.totalTaskCount),
http://git-wip-us.apache.org/repos/asf/hive/blob/cf6b63ee/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 20a5947..fb32e90 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -107,8 +107,9 @@ public class TestWorkloadManager {
}
@Override
- public void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions) {
+ public int updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions) {
isCalled = true;
+ return 0;
}
@Override
@@ -123,6 +124,11 @@ public class TestWorkloadManager {
@Override
public void setClusterChangedCallback(Runnable clusterChangedCallback) {
}
+
+ @Override
+ public int translateAllocationToCpus(double allocation) {
+ return 0;
+ }
}
public static WMResourcePlan plan() {