You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by bs...@apache.org on 2019/03/25 19:15:11 UTC
[hive] branch master updated: HIVE-21204: Instrumentation for
read/write locks in LLAP (Olli Draese via Slim Bouguerra)
This is an automated email from the ASF dual-hosted git repository.
bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c3ec20d HIVE-21204: Instrumentation for read/write locks in LLAP (Olli Draese via Slim Bouguerra)
c3ec20d is described below
commit c3ec20dd4f5b5fbde4007041844f6aed8c262ca1
Author: Olli Draese <od...@cloudera.com>
AuthorDate: Mon Mar 25 12:14:53 2019 -0700
HIVE-21204: Instrumentation for read/write locks in LLAP (Olli Draese via Slim Bouguerra)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 ++
.../hive/llap/cache/SerDeLowLevelCacheImpl.java | 46 +++++++++++++++++++---
.../hadoop/hive/llap/daemon/impl/QueryTracker.java | 39 +++++++++++++-----
.../llap/daemon/services/impl/LlapWebServices.java | 11 ++++--
.../hadoop/hive/llap/io/api/impl/LlapIoImpl.java | 1 +
.../llap/io/encoded/SerDeEncodedDataReader.java | 2 +-
.../llap/tezplugins/LlapTaskSchedulerService.java | 29 ++++++++++++--
7 files changed, 108 insertions(+), 23 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0c783e1..94902de 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4400,6 +4400,9 @@ public class HiveConf extends Configuration {
"logger used for llap-daemons."),
LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", true,
"Whether LLapOutputFormatService should output arrow batches"),
+ LLAP_COLLECT_LOCK_METRICS("hive.llap.lockmetrics.collect", false,
+ "Whether lock metrics (wait times, counts) are collected for LLAP "
+ + "related locks"),
HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms",
new TimeValidator(TimeUnit.MILLISECONDS),
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
index c41b34a..2a39d2d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -15,8 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hive.llap.cache;
+import com.google.common.base.Function;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -29,6 +32,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
@@ -37,14 +42,17 @@ import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
+import org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hive.common.util.Ref;
import org.apache.orc.OrcProto;
import org.apache.orc.OrcProto.ColumnEncoding;
-import com.google.common.base.Function;
-
-public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDump {
+public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDump, Configurable {
private static final int DEFAULT_CLEANUP_INTERVAL = 600;
+ private Configuration conf;
private final Allocator allocator;
private final AtomicInteger newEvictions = new AtomicInteger(0);
private Thread cleanupThread = null;
@@ -53,6 +61,18 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu
private final long cleanupInterval;
private final LlapDaemonCacheMetrics metrics;
+ /// Shared singleton MetricsSource instance for all FileData locks
+ private static final MetricsSource LOCK_METRICS;
+
+ static {
+ // create and register the MetricsSource for lock metrics
+ MetricsSystem ms = LlapMetricsSystem.instance();
+ ms.register("FileDataLockMetrics",
+ "Lock metrics for R/W locks around FileData instances",
+ LOCK_METRICS =
+ ReadWriteLockMetrics.createLockMetricsSource("FileData"));
+ }
+
public static final class LlapSerDeDataBuffer extends LlapAllocatorBuffer {
public boolean isCached = false;
private String tag;
@@ -90,14 +110,18 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu
* TODO: make more granular? We only care that each one reader sees consistent boundaries.
* So, we could shallow-copy the stripes list, then have individual locks inside each.
*/
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final ReadWriteLock rwLock;
private final Object fileKey;
private final int colCount;
private ArrayList<StripeData> stripes;
- public FileData(Object fileKey, int colCount) {
+ public FileData(Configuration conf, Object fileKey, int colCount) {
this.fileKey = fileKey;
this.colCount = colCount;
+
+ rwLock = ReadWriteLockMetrics.wrap(conf,
+ new ReentrantReadWriteLock(),
+ LOCK_METRICS);
}
public void toString(StringBuilder sb) {
@@ -298,7 +322,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu
throw new IOException("Includes " + DebugUtils.toString(includes) + " for "
+ cached.colCount + " columns");
}
- FileData result = new FileData(cached.fileKey, cached.colCount);
+ FileData result = new FileData(conf, cached.fileKey, cached.colCount);
if (gotAllData != null) {
gotAllData.value = true;
}
@@ -727,6 +751,16 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu
}
@Override
+ public void setConf(Configuration newConf) {
+ this.conf = newConf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
public void debugDumpShort(StringBuilder sb) {
sb.append("\nSerDe cache state ");
int allLocked = 0, allUnlocked = 0, allEvicted = 0, allMoving = 0;
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 932d0ad..0d5713a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -24,10 +24,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker;
import org.apache.hadoop.hive.llap.log.LogHelpers;
+import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
+import org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
import org.apache.logging.slf4j.Log4jMarker;
import org.apache.tez.common.CallableWithNdc;
@@ -41,6 +42,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableV
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
import org.slf4j.Logger;
@@ -66,12 +69,26 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class QueryTracker extends AbstractService {
private static final Logger LOG = LoggerFactory.getLogger(QueryTracker.class);
- private static final Marker QUERY_COMPLETE_MARKER = new Log4jMarker(new Log4jQueryCompleteMarker());
+ private static final Marker QUERY_COMPLETE_MARKER =
+ new Log4jMarker(new Log4jQueryCompleteMarker());
+
+ /// Shared singleton MetricsSource instance for all DAG locks
+ private static final MetricsSource LOCK_METRICS;
+
+ static {
+ // create and register the MetricsSource for lock metrics
+ MetricsSystem ms = LlapMetricsSystem.instance();
+ LOCK_METRICS = ReadWriteLockMetrics.createLockMetricsSource("QueryTracker");
+
+ ms.register("QueryTrackerDAGLockMetrics",
+ "Lock metrics for QueryTracher DAG instances", LOCK_METRICS);
+ }
private final ScheduledExecutorService executorService;
private final ConcurrentHashMap<QueryIdentifier, QueryInfo> queryInfoMap = new ConcurrentHashMap<>();
+ private final Configuration conf;
private final String[] localDirsBase;
private final FileSystem localFs;
private final String clusterId;
@@ -91,7 +108,7 @@ public class QueryTracker extends AbstractService {
private final Lock lock = new ReentrantLock();
- private final ConcurrentMap<QueryIdentifier, ReentrantReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
+ private final ConcurrentMap<QueryIdentifier, ReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
// Tracks various maps for dagCompletions. This is setup here since stateChange messages
// may be processed by a thread which ends up executing before a task.
@@ -109,6 +126,8 @@ public class QueryTracker extends AbstractService {
super("QueryTracker");
this.localDirsBase = localDirsBase;
this.clusterId = clusterId;
+ this.conf = conf;
+
try {
localFs = FileSystem.getLocal(conf);
} catch (IOException e) {
@@ -380,16 +399,18 @@ public class QueryTracker extends AbstractService {
}
}
- private ReentrantReadWriteLock getDagLockNoCreate(QueryIdentifier queryIdentifier) {
+ private ReadWriteLock getDagLockNoCreate(QueryIdentifier queryIdentifier) {
return dagSpecificLocks.get(queryIdentifier);
}
- private ReentrantReadWriteLock getDagLock(QueryIdentifier queryIdentifier) {
+ private ReadWriteLock getDagLock(QueryIdentifier queryIdentifier) {
lock.lock();
try {
- ReentrantReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier);
+ ReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier);
if (dagLock == null) {
- dagLock = new ReentrantReadWriteLock();
+ dagLock = ReadWriteLockMetrics.wrap(conf,
+ new ReentrantReadWriteLock(),
+ LOCK_METRICS);
dagSpecificLocks.put(queryIdentifier, dagLock);
}
return dagLock;
@@ -477,7 +498,7 @@ public class QueryTracker extends AbstractService {
@Override
protected Void callInternal() {
LOG.info("External cleanup callable for {}", queryIdentifier);
- ReentrantReadWriteLock dagLock = getDagLockNoCreate(queryIdentifier);
+ ReadWriteLock dagLock = getDagLockNoCreate(queryIdentifier);
if (dagLock == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("null dagLock. No cleanup required at the moment for {}", queryIdString);
@@ -528,7 +549,7 @@ public class QueryTracker extends AbstractService {
private void handleFragmentCompleteExternalQuery(QueryInfo queryInfo) {
if (queryInfo.isExternalQuery()) {
- ReentrantReadWriteLock dagLock = getDagLock(queryInfo.getQueryIdentifier());
+ ReadWriteLock dagLock = getDagLock(queryInfo.getQueryIdentifier());
if (dagLock == null) {
LOG.warn("Ignoring fragment completion for unknown query: {}",
queryInfo.getQueryIdentifier());
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index b944fad..3c124f9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hive.llap.daemon.services.impl;
import java.io.IOException;
@@ -22,7 +23,6 @@ import java.io.PrintWriter;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
-import javax.management.MalformedObjectNameException;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
@@ -36,7 +36,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
-import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl;
import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
@@ -58,8 +57,8 @@ public class LlapWebServices extends AbstractService {
static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods";
static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin";
- static final String REGISTRY_ATTRIBUTE="llap.registry";
- static final String PARENT_ATTRIBUTE="llap.parent";
+ static final String REGISTRY_ATTRIBUTE = "llap.registry";
+ static final String PARENT_ATTRIBUTE = "llap.parent";
private int port;
private HttpServer http;
@@ -100,12 +99,16 @@ public class LlapWebServices extends AbstractService {
builder.setContextAttribute(REGISTRY_ATTRIBUTE, registry);
builder.setContextAttribute(PARENT_ATTRIBUTE, parent);
+ // make conf available to the locking stats servle
+ LlapLockingServlet.setConf(conf);
+
try {
this.http = builder.build();
this.http.addServlet("status", "/status", LlapStatusServlet.class);
this.http.addServlet("peers", "/peers", LlapPeerRegistryServlet.class);
this.http.addServlet("iomem", "/iomem", LlapIoMemoryServlet.class);
this.http.addServlet("system", "/system", SystemConfigurationServlet.class);
+ this.http.addServlet("locking", "/locking", LlapLockingServlet.class);
} catch (IOException e) {
LOG.warn("LLAP web service failed to come up", e);
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 2fffeb8..c63ee5f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -167,6 +167,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
SerDeLowLevelCacheImpl serdeCacheImpl = new SerDeLowLevelCacheImpl(
cacheMetrics, cachePolicyWrapper, allocator);
serdeCache = serdeCacheImpl;
+ serdeCacheImpl.setConf(conf);
}
boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index a5671e9..462b25f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -777,7 +777,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
throw new AssertionError("Caching data without an encoding at " + i + ": " + sd);
}
}
- FileData fd = new FileData(fileKey, encodings.length);
+ FileData fd = new FileData(daemonConf, fileKey, encodings.length);
fd.addStripe(sd);
cache.putFileData(fd, Priority.NORMAL, counters, cacheTag);
} else {
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 560cbaa..cdf767f 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -17,6 +17,8 @@ package org.apache.hadoop.hive.llap.tezplugins;
import com.google.common.io.ByteArrayDataOutput;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
@@ -59,6 +61,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -70,6 +73,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
+import org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
@@ -175,6 +179,19 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
+ /// Shared singleton MetricsSource instance for all FileData locks
+ private static final MetricsSource LOCK_METRICS;
+
+ static {
+ // create and register the MetricsSource for lock metrics
+ MetricsSystem ms = LlapMetricsSystem.instance();
+ LOCK_METRICS =
+ ReadWriteLockMetrics.createLockMetricsSource("TaskScheduler");
+
+ ms.register("LLAPTaskSchedulerLockMetrics",
+ "Lock metrics for R/W locks LLAP task scheduler", LOCK_METRICS);
+ }
+
// TODO: this is an ugly hack; see the same in LlapTaskCommunicator for discussion.
// This only lives for the duration of the service init.
static LlapTaskSchedulerService instance = null;
@@ -225,9 +242,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
@VisibleForTesting
final DelayedTaskSchedulerCallable delayedTaskSchedulerCallable;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
- private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+ private final ReadWriteLock lock;
+ private final Lock readLock;
+ private final Lock writeLock;
private final Lock scheduleLock = new ReentrantLock();
private final Condition scheduleCondition = scheduleLock.newCondition();
@@ -321,6 +338,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
"Failed to parse user payload for " + LlapTaskSchedulerService.class.getSimpleName(), e);
}
+ lock = ReadWriteLockMetrics.wrap(conf,
+ new ReentrantReadWriteLock(),
+ LOCK_METRICS);
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
+
if (conf.getBoolean(LLAP_PLUGIN_ENDPOINT_ENABLED, false)) {
JobTokenSecretManager sm = null;
if (UserGroupInformation.isSecurityEnabled()) {