You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by bs...@apache.org on 2019/03/25 19:31:50 UTC
[hive] branch master updated: HIVE-21204 (Addendum):
Instrumentation for read/write locks in LLAP (Olli Draese via Slim
Bouguerra)
This is an automated email from the ASF dual-hosted git repository.
bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new b01258b HIVE-21204 (Addendum): Instrumentation for read/write locks in LLAP (Olli Draese via Slim Bouguerra)
b01258b is described below
commit b01258b7a592033fdae4dc2deefe2e2f5cfd740d
Author: Olli Draese <od...@cloudera.com>
AuthorDate: Mon Mar 25 12:31:35 2019 -0700
HIVE-21204 (Addendum): Instrumentation for read/write locks in LLAP (Olli Draese via Slim Bouguerra)
---
.../hive/llap/metrics/ReadWriteLockMetrics.java | 431 +++++++++++++++++
.../llap/metrics/TestReadWriteLockMetrics.java | 513 +++++++++++++++++++++
.../daemon/services/impl/LlapLockingServlet.java | 134 ++++++
3 files changed, 1078 insertions(+)
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/ReadWriteLockMetrics.java b/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/ReadWriteLockMetrics.java
new file mode 100644
index 0000000..7d52a15
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/ReadWriteLockMetrics.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License a
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.metrics;
+
+import avro.shaded.com.google.common.annotations.VisibleForTesting;
+
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * Wrapper around a read/write lock to collect the lock wait times.
+ * Instances of this wrapper class can be used to collect/accumulate the wai
+ * times around R/W locks. This is helpful if the source of a performance issue
+ * might be related to lock contention and you need to identify the actual
+ * locks. Instances of this class can be wrapped around any <code>ReadWriteLock
+ * </code> implementation.
+ */
+public class ReadWriteLockMetrics implements ReadWriteLock {
+ private LockWrapper readLock; ///< wrapper around original read lock
+ private LockWrapper writeLock; ///< wrapper around original write lock
+
+ /**
+ * Helper class to compare two <code>LockMetricSource</code> instances.
+ * This <code>Comparator</code> class can be used to sort a list of <code>
+ * LockMetricSource</code> instances in descending order by their total lock
+ * wait time.
+ */
+ public static class MetricsComparator implements Comparator<MetricsSource>, Serializable {
+ private static final long serialVersionUID = -1;
+
+ @Override
+ public int compare(MetricsSource o1, MetricsSource o2) {
+ if (o1 != null && o2 != null
+ && o1 instanceof LockMetricSource && o2 instanceof LockMetricSource) {
+ LockMetricSource lms1 = (LockMetricSource)o1;
+ LockMetricSource lms2 = (LockMetricSource)o2;
+
+ long totalMs1 = (lms1.readLockWaitTimeTotal.value() / 1000000L)
+ + (lms1.writeLockWaitTimeTotal.value() / 1000000L);
+ long totalMs2 = (lms2.readLockWaitTimeTotal.value() / 1000000L)
+ + (lms2.writeLockWaitTimeTotal.value() / 1000000L);
+
+ // sort descending by total lock time
+ if (totalMs1 < totalMs2) {
+ return 1;
+ }
+
+ if (totalMs1 > totalMs2) {
+ return -1;
+ }
+
+ // sort by label (ascending) if lock time is the same
+ return lms1.lockLabel.compareTo(lms2.lockLabel);
+ }
+
+ return 0;
+ }
+ }
+
+ /**
+ * Wraps a <code>ReadWriteLock</code> into a monitored lock if required by
+ * configuration. This helper is checking the <code>
+ * hive.llap.lockmetrics.collect</code> configuration option and wraps the
+ * passed in <code>ReadWriteLock</code> into a monitoring container if the
+ * option is set to <code>true</code>. Otherwise, the original (passed in)
+ * lock instance is returned unmodified.
+ *
+ * @param conf Configuration instance to check for LLAP conf options
+ * @param lock The <code>ReadWriteLock</code> to wrap for monitoring
+ * @param metrics The target container for locking metrics
+ * @see #createLockMetricsSource
+ */
+ public static ReadWriteLock wrap(Configuration conf, ReadWriteLock lock,
+ MetricsSource metrics) {
+ Preconditions.checkNotNull(lock, "Caller has to provide valid input lock");
+ boolean needsWrap = false;
+
+ if (null != conf) {
+ needsWrap =
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_COLLECT_LOCK_METRICS);
+ }
+
+ if (false == needsWrap) {
+ return lock;
+ }
+
+ Preconditions.checkNotNull(metrics,
+ "Caller has to procide group specific metrics source");
+ return new ReadWriteLockMetrics(lock, metrics);
+ }
+
+ /**
+ * Factory method for new metric collections.
+ * You can create and use a single <code>MetricsSource</code> collection for
+ * multiple R/W locks. This makes sense if several locks belong to a single
+ * group and you're then interested in the accumulated values for the whole
+ * group, rather than the single lock instance. The passed in label is
+ * supposed to identify the group uniquely.
+ *
+ * @param label The group identifier for lock statistics
+ */
+ public static MetricsSource createLockMetricsSource(String label) {
+ Preconditions.checkNotNull(label);
+ Preconditions.checkArgument(!label.contains("\""),
+ "Label can't contain quote (\")");
+ return new LockMetricSource(label);
+ }
+
+ /**
+ * Returns a list with all created <code>MetricsSource</code> instances for
+ * the R/W lock metrics. The returned list contains the instances that were
+ * previously created via the <code>createLockMetricsSource</code> function.
+ *
+ * @return A list of all R/W lock based metrics sources
+ */
+ public static List<MetricsSource> getAllMetricsSources() {
+ ArrayList<MetricsSource> ret = null;
+
+ synchronized (LockMetricSource.allInstances) {
+ ret = new ArrayList<>(LockMetricSource.allInstances);
+ }
+
+ return ret;
+ }
+
+ /// Enumeration of metric info names and descriptions
+ @VisibleForTesting
+ public enum LockMetricInfo implements MetricsInfo {
+ ReadLockWaitTimeTotal("The total wait time for read locks in nanoseconds"),
+ ReadLockWaitTimeMax("The maximum wait time for a read lock in nanoseconds"),
+ ReadLockCount("Total amount of read lock requests"),
+ WriteLockWaitTimeTotal(
+ "The total wait time for write locks in nanoseconds"),
+ WriteLockWaitTimeMax(
+ "The maximum wait time for a write lock in nanoseconds"),
+ WriteLockCount("Total amount of write lock requests");
+
+ private final String description; ///< metric description
+
+ /**
+ * Creates a new <code>MetricsInfo</code> with the given description.
+ *
+ * @param desc The description of the info
+ */
+ private LockMetricInfo(String desc) {
+ description = desc;
+ }
+
+ @Override
+ public String description() {
+ return this.description;
+ }
+ }
+
+ /**
+ * Source of the accumulated lock times and counts.
+ * Instances of this <code>MetricSource</code> can be created via the static
+ * factory method <code>createLockMetricsSource</code> and shared across
+ * multiple instances of the outer <code>ReadWriteLockMetric</code> class.
+ */
+ @Metrics(about = "Lock Metrics", context = "locking")
+ private static class LockMetricSource implements MetricsSource {
+ private static final ArrayList<MetricsSource> allInstances = new ArrayList<>();
+
+ private final String lockLabel; ///< identifier for the group of locks
+
+ /// accumulated wait time for read locks
+ @Metric
+ MutableCounterLong readLockWaitTimeTotal;
+
+ /// highest wait time for read locks
+ @Metric
+ MutableCounterLong readLockWaitTimeMax;
+
+ /// total number of read lock calls
+ @Metric
+ MutableCounterLong readLockCounts;
+
+ /// accumulated wait time for write locks
+ @Metric
+ MutableCounterLong writeLockWaitTimeTotal;
+
+ /// highest wait time for write locks
+ @Metric
+ MutableCounterLong writeLockWaitTimeMax;
+
+ /// total number of write lock calls
+ @Metric
+ MutableCounterLong writeLockCounts;
+
+ /**
+ * Creates a new metrics collection instance.
+ * Several locks can share a single <code>MetricsSource</code> instances
+ * where all of them increment the metrics counts together. This can be
+ * interesting to have a single instance for a group of related locks. The
+ * group should then be identified by the label.
+ *
+ * @param label The identifier of the metrics collection
+ */
+ private LockMetricSource(String label) {
+ lockLabel = label;
+ readLockWaitTimeTotal
+ = new MutableCounterLong(LockMetricInfo.ReadLockWaitTimeTotal, 0);
+ readLockWaitTimeMax
+ = new MutableCounterLong(LockMetricInfo.ReadLockWaitTimeMax, 0);
+ readLockCounts
+ = new MutableCounterLong(LockMetricInfo.ReadLockCount, 0);
+ writeLockWaitTimeTotal
+ = new MutableCounterLong(LockMetricInfo.WriteLockWaitTimeTotal, 0);
+ writeLockWaitTimeMax
+ = new MutableCounterLong(LockMetricInfo.WriteLockWaitTimeMax, 0);
+ writeLockCounts
+ = new MutableCounterLong(LockMetricInfo.WriteLockCount, 0);
+
+ synchronized (allInstances) {
+ allInstances.add(this);
+ }
+ }
+
+ @Override
+ public void getMetrics(MetricsCollector collector, boolean all) {
+ collector.addRecord(this.lockLabel)
+ .setContext("Locking")
+ .addCounter(LockMetricInfo.ReadLockWaitTimeTotal,
+ readLockWaitTimeTotal.value())
+ .addCounter(LockMetricInfo.ReadLockWaitTimeMax,
+ readLockWaitTimeMax.value())
+ .addCounter(LockMetricInfo.ReadLockCount,
+ readLockCounts.value())
+ .addCounter(LockMetricInfo.WriteLockWaitTimeTotal,
+ writeLockWaitTimeTotal.value())
+ .addCounter(LockMetricInfo.WriteLockWaitTimeMax,
+ writeLockWaitTimeMax.value())
+ .addCounter(LockMetricInfo.WriteLockCount,
+ writeLockCounts.value());
+ }
+
+ @Override
+ public String toString() {
+ long avgRead = 0L;
+ long avgWrite = 0L;
+ long totalMillis = 0L;
+
+ if (0 < readLockCounts.value()) {
+ avgRead = readLockWaitTimeTotal.value() / readLockCounts.value();
+ }
+
+ if (0 < writeLockCounts.value()) {
+ avgWrite = writeLockWaitTimeTotal.value() / writeLockCounts.value();
+ }
+
+ totalMillis = (readLockWaitTimeTotal.value() / 1000000L)
+ + (writeLockWaitTimeTotal.value() / 1000000L);
+
+ StringBuffer sb = new StringBuffer();
+ sb.append("{ \"type\" : \"R/W Lock Stats\", \"label\" : \"");
+ sb.append(lockLabel);
+ sb.append("\", \"totalLockWaitTimeMillis\" : ");
+ sb.append(totalMillis);
+ sb.append(", \"readLock\" : { \"count\" : ");
+ sb.append(readLockCounts.value());
+ sb.append(", \"avgWaitTimeNanos\" : ");
+ sb.append(avgRead);
+ sb.append(", \"maxWaitTimeNanos\" : ");
+ sb.append(readLockWaitTimeMax.value());
+ sb.append(" }, \"writeLock\" : { \"count\" : ");
+ sb.append(writeLockCounts.value());
+ sb.append(", \"avgWaitTimeNanos\" : ");
+ sb.append(avgWrite);
+ sb.append(", \"maxWaitTimeNanos\" : ");
+ sb.append(writeLockWaitTimeMax.value());
+ sb.append(" } }");
+
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Inner helper class to wrap the original lock with a monitored one.
+ * This inner class is delegating all actual locking operations to the wrapped
+ * lock, while itself is only responsible to measure the time that it took to
+ * acquire a specific lock.
+ */
+ private static class LockWrapper implements Lock {
+ /// the lock to delegate the work to
+ private final Lock wrappedLock;
+ /// total lock wait time in nanos
+ private final MutableCounterLong lockWaitTotal;
+ /// highest lock wait time (max)
+ private final MutableCounterLong lockWaitMax;
+ /// number of lock counts
+ private final MutableCounterLong lockWaitCount;
+
+ /**
+ * Creates a new wrapper around an existing lock.
+ *
+ * @param original The original lock to wrap by this monitoring lock
+ * @param total The (atomic) counter to increment for total lock wait time
+ * @param max The (atomic) counter to adjust to the maximum wait time
+ * @param cnt The (atomic) counter to increment with each lock call
+ */
+ LockWrapper(Lock original, MutableCounterLong total,
+ MutableCounterLong max, MutableCounterLong cnt) {
+ wrappedLock = original;
+ this.lockWaitTotal = total;
+ this.lockWaitMax = max;
+ this.lockWaitCount = cnt;
+ }
+
+ @Override
+ public void lock() {
+ long start = System.nanoTime();
+ wrappedLock.lock();
+ incrementBy(System.nanoTime() - start);
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ long start = System.nanoTime();
+ wrappedLock.lockInterruptibly();
+ incrementBy(System.nanoTime() - start);
+ }
+
+ @Override
+ public boolean tryLock() {
+ return wrappedLock.tryLock();
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit)
+ throws InterruptedException {
+ long start = System.nanoTime();
+ boolean ret = wrappedLock.tryLock(time, unit);
+ incrementBy(System.nanoTime() - start);
+ return ret;
+ }
+
+ @Override
+ public void unlock() {
+ wrappedLock.unlock();
+ }
+
+ @Override
+ public Condition newCondition() {
+ return wrappedLock.newCondition();
+ }
+
+ /**
+ * Helper to increment the monitoring counters.
+ * Called from the lock implementations to increment the total/max/coun
+ * values of the monitoring counters.
+ *
+ * @param waitTime The actual wait time (in nanos) for the lock operation
+ */
+ private void incrementBy(long waitTime) {
+ this.lockWaitTotal.incr(waitTime);
+ this.lockWaitCount.incr();
+
+ if (waitTime > this.lockWaitMax.value()) {
+ this.lockWaitMax.incr(waitTime - this.lockWaitMax.value());
+ }
+ }
+ }
+
+ /**
+ * Creates a new monitoring wrapper around a R/W lock.
+ * The so created wrapper instance can be used instead of the original R/W
+ * lock, which then automatically updates the monitoring values in the <code>
+ * MetricsSource</code>. This allows easy "slide in" of lock monitoring where
+ * originally only a standard R/W lock was used.
+ *
+ * @param lock The original R/W lock to wrap for monitoring
+ * @param metrics The target for lock monitoring
+ */
+ private ReadWriteLockMetrics(ReadWriteLock lock, MetricsSource metrics) {
+ Preconditions.checkNotNull(lock);
+ Preconditions.checkArgument(metrics instanceof LockMetricSource,
+ "Invalid MetricsSource");
+
+ LockMetricSource lms = (LockMetricSource)metrics;
+ readLock = new LockWrapper(lock.readLock(), lms.readLockWaitTimeTotal,
+ lms.readLockWaitTimeMax, lms.readLockCounts);
+ writeLock = new LockWrapper(lock.writeLock(), lms.writeLockWaitTimeTotal,
+ lms.writeLockWaitTimeMax, lms.writeLockCounts);
+ }
+
+ @Override
+ public Lock readLock() {
+ return readLock;
+ }
+
+ @Override
+ public Lock writeLock() {
+ return writeLock;
+ }
+}
diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/metrics/TestReadWriteLockMetrics.java b/llap-common/src/test/org/apache/hadoop/hive/llap/metrics/TestReadWriteLockMetrics.java
new file mode 100644
index 0000000..9784376
--- /dev/null
+++ b/llap-common/src/test/org/apache/hadoop/hive/llap/metrics/TestReadWriteLockMetrics.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License a
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.metrics;
+
+import static java.lang.Math.max;
+import static java.lang.System.nanoTime;
+import static org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics.LockMetricInfo.ReadLockCount;
+import static org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics.LockMetricInfo.ReadLockWaitTimeMax;
+import static org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics.LockMetricInfo.ReadLockWaitTimeTotal;
+import static org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics.LockMetricInfo.WriteLockCount;
+import static org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics.LockMetricInfo.WriteLockWaitTimeMax;
+import static org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics.LockMetricInfo.WriteLockWaitTimeTotal;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.junit.Test;
+
+/**
+ * JUnit test suite for the <code>ReadWriteLockMetrics</code> class.
+ * The test uses a background thread and has some hard coded thread execution
+ * times. It should normally not take more than 2 threads and 400ms execution
+ * time.
+ */
+public class TestReadWriteLockMetrics {
+ /**
+ * Thread which performs locks in loop, holding the lock for 5ms.
+ */
+ private static class LockHolder extends Thread {
+ public static final long LOCK_HOLD_TIME = 5; ///< lock hold time in ms
+
+ private final Lock targetLock; ///< the lock to hold
+ private long lockCount; ///< loop coun
+ private long lockWaitSum; ///< total lock wait time
+ private long lockWaitMax; ///< highest lock wait time
+ private long endTime; ///< runtime for the thread
+
+ /**
+ * Create a new lock holding thread.
+ * The so created thread start immediately.
+ *
+ * @param l The lock to lock/unlock in loop
+ * @param ttl The expected thread run time in ms
+ */
+ public LockHolder(Lock l, long ttl) {
+ targetLock = l;
+
+ lockCount = 0;
+ lockWaitSum = 0;
+ lockWaitMax = 0;
+ endTime = ttl;
+
+ setName(getClass().getSimpleName());
+ setDaemon(true);
+ start();
+ }
+
+ /**
+ * Returns the number of counted locks.
+ * @return The total lock loop execution coun
+ */
+ public long getLockCount() {
+ return lockCount;
+ }
+
+ /**
+ * Returns the accumulated nano seconds for locks.
+ * @return The aggregated time, the thread was waiting on locks (in nanos)
+ */
+ public long getLockSum() {
+ return lockWaitSum;
+ }
+
+ /**
+ * Returns the highest lock time in nano seconds.
+ * @return The highest (single) lock wait time (in nanos)
+ */
+ public long getLockMax() {
+ return lockWaitMax;
+ }
+
+ @Override
+ public void run() {
+ endTime = nanoTime() + toNano(endTime); // ttl was in ms
+
+ // loop for specified amount of time
+ while (nanoTime() <= endTime && !isInterrupted()) {
+ try {
+ long start = nanoTime();
+ targetLock.lock();
+ ++lockCount;
+ long diff = nanoTime() - start;
+ lockWaitSum += diff;
+ lockWaitMax = max(diff, lockWaitMax);
+
+ while (nanoTime() <= (start + toNano(LOCK_HOLD_TIME))) {
+ // spin for LOCK_HOLD_TIME ms (under lock)
+ }
+ } finally {
+ targetLock.unlock();
+ }
+ }
+ }
+ }
+
+ /**
+ * Mock metrics collector for this test only.
+ * This <code>MetricsCollector</code> implementation is used to get the actual
+ * <code>MetricsSource</code> data, collected by the <code>
+ * ReadWriteLockMetrics</code>.
+ */
+ private static class MockMetricsCollector implements MetricsCollector {
+ private ArrayList<MockRecord> records = new ArrayList<>();
+
+ /**
+ * Single metrics record mock implementation.
+ */
+ public static class MockRecord {
+ private final String recordLabel; ///< record tag/label
+ private final HashMap<MetricsInfo,Number> metrics; ///< metrics within record
+ private String context; ///< collector context ID
+
+ /**
+ * @param label metrics record label.
+ */
+ public MockRecord(String label) {
+ recordLabel = label;
+ metrics = new HashMap<>();
+ }
+
+ /**
+ * @return The record's tag/label.
+ */
+ public String getLabel() {
+ return recordLabel;
+ }
+
+ /**
+ * @return The context of the collector.
+ */
+ public String getContext() {
+ return context;
+ }
+
+ /**
+ * @return Map of identifier/metric value pairs.
+ */
+ public Map<MetricsInfo,Number> getMetrics() {
+ return metrics;
+ }
+ }
+
+ /**
+ * Record builder mock implementation.
+ */
+ private class MockMetricsRecordBuilder extends MetricsRecordBuilder {
+ private MockRecord target = null; ///< the record that is populated
+
+ /**
+ * Used by outer class to provide a new <code>MetricsRecordBuilder</code>
+ * for a single metrics record.
+ *
+ * @param t The record to build.
+ */
+ public MockMetricsRecordBuilder(MockRecord t) {
+ target = t;
+ }
+
+ @Override
+ public MetricsRecordBuilder add(MetricsTag arg0) {
+ throw new AssertionError("Not implemented for test");
+ }
+
+ @Override
+ public MetricsRecordBuilder add(AbstractMetric arg0) {
+ throw new AssertionError("Not implemented for test");
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(MetricsInfo arg0, int arg1) {
+ target.getMetrics().put(arg0, arg1);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(MetricsInfo arg0, long arg1) {
+ target.getMetrics().put(arg0, arg1);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo arg0, int arg1) {
+ throw new AssertionError("Not implemented for test");
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo arg0, long arg1) {
+ throw new AssertionError("Not implemented for test");
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo arg0, float arg1) {
+ throw new AssertionError("Not implemented for test");
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo arg0, double arg1) {
+ throw new AssertionError("Not implemented for test");
+ }
+
+ @Override
+ public MetricsCollector parent() {
+ return MockMetricsCollector.this;
+ }
+
+ @Override
+ public MetricsRecordBuilder setContext(String arg0) {
+ target.context = arg0;
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder tag(MetricsInfo arg0, String arg1) {
+ throw new AssertionError("Not implemented for test");
+ }
+ }
+
+ @Override
+ public MetricsRecordBuilder addRecord(String arg0) {
+ MockRecord tr = new MockRecord(arg0);
+ records.add(tr);
+ return new MockMetricsRecordBuilder(tr);
+ }
+
+ @Override
+ public MetricsRecordBuilder addRecord(MetricsInfo arg0) {
+ MockRecord tr = new MockRecord(arg0.name());
+ records.add(tr);
+ return new MockMetricsRecordBuilder(tr);
+ }
+
+ /**
+ * @return A list of all built metrics records.
+ */
+ public List<MockRecord> getRecords() {
+ return records;
+ }
+ }
+
+ /**
+ * Helper to verify the actual value by comparing it with a +/- tolerance of
+ * 10% with the expected value.
+ *
+ * @param txt Assertion message
+ * @param expected The expected value (tolerance will be applied)
+ * @param actual Actual test outcome
+ */
+ private void assertWithTolerance(String txt, long expected, long actual) {
+ long lowExpected = expected - (expected / 10L);
+ long highExpected = expected + (expected / 10L);
+
+ StringBuffer msg = new StringBuffer(txt);
+ msg.append(" (expected ");
+ msg.append(lowExpected);
+ msg.append(" <= x <= ");
+ msg.append(highExpected);
+ msg.append(" but actual = ");
+ msg.append(actual);
+ msg.append(")");
+
+ assertTrue(msg.toString(), actual >= lowExpected && actual <= highExpected);
+ }
+
+ /**
+ * Helper to convert milliseconds to nanoseconds.
+ *
+ * @param ms Millisecond inpu
+ * @return Value in nanoseconds
+ */
+ private static long toNano(long ms) {
+ return ms * 1000000;
+ }
+
+ /**
+ * Helper to produce <code>ReadWriteLockMetrics</code> instances.
+ * The wrapping of lock instances is configuration dependent. This helper ensures that the
+ * configuration creates wrapped lock instances.
+ *
+ * @param lock The lock to wrap
+ * @param ms The metrics source, storing the lock measurements
+ * @return The wrapped lock
+ */
+ private ReadWriteLockMetrics create(ReadWriteLock lock, MetricsSource ms) {
+ Configuration dummyConf = new Configuration();
+
+ HiveConf.setBoolVar(dummyConf,
+ HiveConf.ConfVars.LLAP_COLLECT_LOCK_METRICS, true);
+ return (ReadWriteLockMetrics)ReadWriteLockMetrics.wrap(dummyConf, lock, ms);
+ }
+
+ /**
+ * Runs a simple test where a thread is running in a loop, getting read locks w/o having to
+ * deal with any contention. The test shows that the locks are received rather quick and tha
+ * all metrics for write locks remain zero.
+ */
+ @Test
+ public void testWithoutContention() throws Exception {
+ final long execTime = 100;
+
+ MetricsSource ms = ReadWriteLockMetrics.createLockMetricsSource("test1");
+ ReadWriteLock rwl = create(new ReentrantReadWriteLock(), ms);
+ LockHolder lhR = new LockHolder(rwl.readLock(), execTime);
+
+ // wait for the thread to do its locks and waits (for 100ms)
+ lhR.join();
+
+ // get the reported metrics
+ MockMetricsCollector tmc = new MockMetricsCollector();
+ ms.getMetrics(tmc, true);
+
+ List<MockMetricsCollector.MockRecord> result = tmc.getRecords();
+ assertEquals("Unexpected amount of metrics", 1, result.size());
+ MockMetricsCollector.MockRecord rec = result.get(0);
+
+ // verify label and context (context is hard coded)
+ assertEquals("Invalid record label", "test1", rec.getLabel());
+ assertEquals("Invalid record context", "Locking", rec.getContext());
+
+ // we expect around exectome / thread loop time executions
+ assertWithTolerance("Unexpected count of lock executions (reader)",
+ execTime / LockHolder.LOCK_HOLD_TIME, lhR.getLockCount());
+ assertEquals("Counting the locks failed",
+ lhR.getLockCount(), rec.getMetrics().get(ReadLockCount));
+
+ // sanity check in read lock metrics
+ assertNotEquals("Local thread should have lock time", lhR.getLockSum(), 0);
+ assertNotEquals("Accounted lock time zero",
+ rec.getMetrics().get(ReadLockWaitTimeTotal), 0);
+ assertTrue("Local measurement larger (overhead)",
+ rec.getMetrics().get(ReadLockWaitTimeTotal).longValue()
+ < lhR.getLockSum());
+
+ assertNotEquals("Local thread should have max lock time",
+ lhR.getLockMax(), 0);
+ assertNotEquals("Accounted lock max time zero",
+ rec.getMetrics().get(ReadLockWaitTimeMax), 0);
+
+ assertTrue("Local max larger (overhead)",
+ rec.getMetrics().get(ReadLockWaitTimeMax).longValue()
+ < lhR.getLockMax());
+
+ assertTrue("Max greater or equal to avergae lock time",
+ (rec.getMetrics().get(ReadLockWaitTimeTotal).longValue()
+ / rec.getMetrics().get(ReadLockCount).longValue())
+ <= rec.getMetrics().get(ReadLockWaitTimeMax).longValue());
+
+ assertTrue("Lock time less than 1% (no contention)",
+ rec.getMetrics().get(ReadLockWaitTimeTotal).longValue()
+ < toNano(execTime / 100L));
+
+ // sanity check on write lock metrics (should be all zero)
+ assertEquals("No writer lock activity expected (total)",
+ rec.getMetrics().get(WriteLockWaitTimeTotal), 0L);
+ assertEquals("No writer lock activity expected (max)",
+ rec.getMetrics().get(WriteLockWaitTimeMax), 0L);
+ assertEquals("No writer lock activity expected (count)",
+ rec.getMetrics().get(WriteLockCount), 0L);
+ }
+
+ /**
+ * Test where read/write lock contention is tested.
+ * This test has a background thread that tries to get read locks within a
+ * loop while the main thread holds a write lock for half of the tex
+ * execution time. The test verifies that the reported metric for read lock
+ * wait time reflects that the thread was blocked until the write lock was
+ * released. It also performs basic sanity checks on the read and write lock
+ * metrics.
+ */
+ @Test
+ public void testWithContention() throws Exception {
+ final long execTime = 200;
+
+ MetricsSource ms = ReadWriteLockMetrics.createLockMetricsSource("test1");
+ ReadWriteLock rwl = create(new ReentrantReadWriteLock(), ms);
+ LockHolder lhR = new LockHolder(rwl.readLock(), execTime);
+
+ // get a write lock for half of the execution time
+ try {
+ long endOfLock = nanoTime() + toNano(execTime / 2);
+ rwl.writeLock().lock();
+
+ while (nanoTime() < endOfLock) {
+ // spin until end time is reached
+ }
+ } finally {
+ rwl.writeLock().unlock();
+ }
+
+ // wait for the thread to do its locks and waits (for 100ms)
+ lhR.join();
+
+ MockMetricsCollector tmc = new MockMetricsCollector();
+ ms.getMetrics(tmc, true);
+
+ List<MockMetricsCollector.MockRecord> result = tmc.getRecords();
+ assertEquals("Unexpected amount of metrics", 1, result.size());
+ MockMetricsCollector.MockRecord rec = result.get(0);
+
+ // sanity checks for read lock values
+ assertEquals("Verifying the loop count (read lock)",
+ lhR.getLockCount(),
+ rec.getMetrics().get(ReadLockCount).longValue());
+
+ assertWithTolerance("Only half of possible read locks expected",
+ (execTime / LockHolder.LOCK_HOLD_TIME) / 2,
+ rec.getMetrics().get(ReadLockCount).longValue());
+
+ assertWithTolerance("Max read lock wait time close to write lock hold",
+ toNano(execTime / 2),
+ rec.getMetrics().get(ReadLockWaitTimeMax).longValue());
+
+ assertTrue("Total read lock wait time larger than max",
+ rec.getMetrics().get(ReadLockWaitTimeMax).longValue()
+ < rec.getMetrics().get(ReadLockWaitTimeTotal).longValue());
+
+ // sanity check for write locks
+ assertEquals("Write lock count supposed to be one",
+ 1, rec.getMetrics().get(WriteLockCount).longValue());
+
+ assertTrue("Write lock wait time non zero",
+ 0L < rec.getMetrics().get(WriteLockWaitTimeTotal).longValue());
+ assertEquals("With one lock, total should me max",
+ rec.getMetrics().get(WriteLockWaitTimeTotal),
+ rec.getMetrics().get(WriteLockWaitTimeMax));
+ }
+
+ /**
+ * Testing the <code>wrap</code> function for different configuration
+ * combinations.
+ */
+ @Test
+ public void testWrap() throws Exception {
+ Configuration testConf = new Configuration();
+ MetricsSource ms = ReadWriteLockMetrics.createLockMetricsSource("testConf");
+
+ // default = passthrough
+ ReadWriteLock rwlDef =
+ ReadWriteLockMetrics.wrap(testConf, new ReentrantReadWriteLock(), ms);
+ assertTrue("Basic ReentrantReadWriteLock expected",
+ rwlDef instanceof ReentrantReadWriteLock);
+ assertFalse("Basic ReentrantReadWriteLock expected",
+ rwlDef instanceof ReadWriteLockMetrics);
+
+ // false = pass through
+ HiveConf.setBoolVar(testConf,
+ HiveConf.ConfVars.LLAP_COLLECT_LOCK_METRICS, false);
+ ReadWriteLock rwlBasic =
+ ReadWriteLockMetrics.wrap(testConf, new ReentrantReadWriteLock(), ms);
+ assertTrue("Basic ReentrantReadWriteLock expected",
+ rwlBasic instanceof ReentrantReadWriteLock);
+ assertFalse("Basic ReentrantReadWriteLock expected",
+ rwlBasic instanceof ReadWriteLockMetrics);
+
+ // true = wrap
+ HiveConf.setBoolVar(testConf,
+ HiveConf.ConfVars.LLAP_COLLECT_LOCK_METRICS, true);
+ ReadWriteLock rwlWrap =
+ ReadWriteLockMetrics.wrap(testConf, new ReentrantReadWriteLock(), ms);
+ assertTrue("Wrapped lock expected",
+ rwlWrap instanceof ReadWriteLockMetrics);
+
+ // null = passthrough
+ ReadWriteLock rwlNoConf =
+ ReadWriteLockMetrics.wrap(null, new ReentrantReadWriteLock(), null);
+ assertTrue("Basic ReentrantReadWriteLock expected",
+ rwlNoConf instanceof ReentrantReadWriteLock);
+ assertFalse("Basic ReentrantReadWriteLock expected",
+ rwlNoConf instanceof ReadWriteLockMetrics);
+ }
+}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapLockingServlet.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapLockingServlet.java
new file mode 100644
index 0000000..86d9552
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapLockingServlet.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License a
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.services.impl;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import java.io.PrintWriter;
+import java.util.List;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hive.http.HttpServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Servlet to produce the JSON output for the locking endpoint.
+ * The servlet produces and writes a JSON document, that lists all the locking statistics,
+ * available through the <code>ReadWriteLockMetrics</code> instrumentation.
+ */
+public class LlapLockingServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(LlapLockingServlet.class);
+ private static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods";
+ private static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin";
+ private static Configuration conf = null;
+
+ /**
+ * Configuration setter, used to figure out the lock statistics collection setting.
+ *
+ * @param c The configuration to use
+ */
+ public static void setConf(Configuration c) {
+ conf = c;
+ }
+
+ @Override
+ public void init() throws ServletException {
+ LOG.info("LlapLockingServlet initialized");
+ }
+
+ @Override
+ public void doGet(HttpServletRequest request, HttpServletResponse response) {
+ try {
+ if (!HttpServer.isInstrumentationAccessAllowed(getServletContext(),
+ request, response)) {
+ response.setStatus(HttpServletResponse.SC_FORBIDDEN);
+ } else {
+ String collString = "\"disabled\"";
+ boolean statsEnabled = false;
+
+ // populate header
+ response.setContentType("application/json; charset=utf8");
+ response.setHeader(ACCESS_CONTROL_ALLOW_METHODS, "GET");
+ response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
+ response.setHeader("Cache-Control",
+ "no-transform,public,max-age=60,s-maxage=60");
+
+ if (null != conf && HiveConf.getBoolVar(conf,
+ HiveConf.ConfVars.LLAP_COLLECT_LOCK_METRICS)) {
+ collString = "\"enabled\"";
+ statsEnabled = true;
+ }
+
+ StringBuffer result = new StringBuffer();
+ List<MetricsSource> sourceList
+ = ReadWriteLockMetrics.getAllMetricsSources();
+ if (null == sourceList) {
+ // should actually never happen
+ result.append("{\"error\":\"R/W statistics not found\"}");
+ } else {
+ sourceList.sort(new ReadWriteLockMetrics.MetricsComparator());
+ boolean first = true;
+
+ result.append("{\"statsCollection\":");
+ result.append(collString);
+ result.append(",\"lockStats\":[");
+
+ // dump an object per lock label
+ if (statsEnabled) {
+ for (MetricsSource ms : sourceList) {
+ if (!first) {
+ result.append(",");
+ }
+
+ first = false;
+ result.append(ms);
+ }
+ }
+
+ result.append("]}");
+ }
+
+ // send string through JSON parser/builder to pretty print it.
+ JsonParser parser = new JsonParser();
+ JsonObject json = parser.parse(result.toString()).getAsJsonObject();
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ try (PrintWriter w = response.getWriter()) {
+ w.println(gson.toJson(json));
+ }
+ }
+ }
+ catch (Exception e) {
+ LOG.error("Exception while processing locking stats request", e);
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ }
+}
\ No newline at end of file