You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by bs...@apache.org on 2019/03/25 19:15:11 UTC

[hive] branch master updated: HIVE-21204: Instrumentation for read/write locks in LLAP (Olli Draese via Slim Bouguerra)

This is an automated email from the ASF dual-hosted git repository.

bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new c3ec20d  HIVE-21204: Instrumentation for read/write locks in LLAP (Olli Draese via Slim Bouguerra)
c3ec20d is described below

commit c3ec20dd4f5b5fbde4007041844f6aed8c262ca1
Author: Olli Draese <od...@cloudera.com>
AuthorDate: Mon Mar 25 12:14:53 2019 -0700

    HIVE-21204: Instrumentation for read/write locks in LLAP (Olli Draese via Slim Bouguerra)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  3 ++
 .../hive/llap/cache/SerDeLowLevelCacheImpl.java    | 46 +++++++++++++++++++---
 .../hadoop/hive/llap/daemon/impl/QueryTracker.java | 39 +++++++++++++-----
 .../llap/daemon/services/impl/LlapWebServices.java | 11 ++++--
 .../hadoop/hive/llap/io/api/impl/LlapIoImpl.java   |  1 +
 .../llap/io/encoded/SerDeEncodedDataReader.java    |  2 +-
 .../llap/tezplugins/LlapTaskSchedulerService.java  | 29 ++++++++++++--
 7 files changed, 108 insertions(+), 23 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0c783e1..94902de 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4400,6 +4400,9 @@ public class HiveConf extends Configuration {
         "logger used for llap-daemons."),
     LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", true,
       "Whether LLapOutputFormatService should output arrow batches"),
+    LLAP_COLLECT_LOCK_METRICS("hive.llap.lockmetrics.collect", false,
+        "Whether lock metrics (wait times, counts) are collected for LLAP "
+        + "related locks"),
 
     HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms",
       new TimeValidator(TimeUnit.MILLISECONDS),
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
index c41b34a..2a39d2d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -15,8 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hive.llap.cache;
 
+import com.google.common.base.Function;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -29,6 +32,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
 import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
@@ -37,14 +42,17 @@ import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
+import org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hive.common.util.Ref;
 import org.apache.orc.OrcProto;
 import org.apache.orc.OrcProto.ColumnEncoding;
 
-import com.google.common.base.Function;
-
-public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDump {
+public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDump, Configurable {
   private static final int DEFAULT_CLEANUP_INTERVAL = 600;
+  private Configuration conf;
   private final Allocator allocator;
   private final AtomicInteger newEvictions = new AtomicInteger(0);
   private Thread cleanupThread = null;
@@ -53,6 +61,18 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu
   private final long cleanupInterval;
   private final LlapDaemonCacheMetrics metrics;
 
+  /// Shared singleton MetricsSource instance for all FileData locks
+  private static final MetricsSource LOCK_METRICS;
+
+  static {
+    // create and register the MetricsSource for lock metrics
+    MetricsSystem ms = LlapMetricsSystem.instance();
+    ms.register("FileDataLockMetrics",
+                "Lock metrics for R/W locks around FileData instances",
+                LOCK_METRICS =
+                    ReadWriteLockMetrics.createLockMetricsSource("FileData"));
+  }
+
   public static final class LlapSerDeDataBuffer extends LlapAllocatorBuffer {
     public boolean isCached = false;
     private String tag;
@@ -90,14 +110,18 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu
      * TODO: make more granular? We only care that each one reader sees consistent boundaries.
      *       So, we could shallow-copy the stripes list, then have individual locks inside each.
      */
-    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final ReadWriteLock rwLock;
     private final Object fileKey;
     private final int colCount;
     private ArrayList<StripeData> stripes;
 
-    public FileData(Object fileKey, int colCount) {
+    public FileData(Configuration conf, Object fileKey, int colCount) {
       this.fileKey = fileKey;
       this.colCount = colCount;
+
+      rwLock = ReadWriteLockMetrics.wrap(conf,
+                                         new ReentrantReadWriteLock(),
+                                         LOCK_METRICS);
     }
 
     public void toString(StringBuilder sb) {
@@ -298,7 +322,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu
           throw new IOException("Includes " + DebugUtils.toString(includes) + " for "
               + cached.colCount + " columns");
         }
-        FileData result = new FileData(cached.fileKey, cached.colCount);
+        FileData result = new FileData(conf, cached.fileKey, cached.colCount);
         if (gotAllData != null) {
           gotAllData.value = true;
         }
@@ -727,6 +751,16 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu
   }
 
   @Override
+  public void setConf(Configuration newConf) {
+    this.conf = newConf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
   public void debugDumpShort(StringBuilder sb) {
     sb.append("\nSerDe cache state ");
     int allLocked = 0, allUnlocked = 0, allEvicted = 0, allMoving = 0;
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 932d0ad..0d5713a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -24,10 +24,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker;
 import org.apache.hadoop.hive.llap.log.LogHelpers;
+import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
+import org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.logging.slf4j.Log4jMarker;
 import org.apache.tez.common.CallableWithNdc;
 
@@ -41,6 +42,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableV
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
 import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.slf4j.Logger;
@@ -66,12 +69,26 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 public class QueryTracker extends AbstractService {
 
   private static final Logger LOG = LoggerFactory.getLogger(QueryTracker.class);
-  private static final Marker QUERY_COMPLETE_MARKER = new Log4jMarker(new Log4jQueryCompleteMarker());
+  private static final Marker QUERY_COMPLETE_MARKER =
+      new Log4jMarker(new Log4jQueryCompleteMarker());
+
+  /// Shared singleton MetricsSource instance for all DAG locks
+  private static final MetricsSource LOCK_METRICS;
+
+  static {
+    // create and register the MetricsSource for lock metrics
+    MetricsSystem ms = LlapMetricsSystem.instance();
+    LOCK_METRICS = ReadWriteLockMetrics.createLockMetricsSource("QueryTracker");
+
+    ms.register("QueryTrackerDAGLockMetrics",
+                "Lock metrics for QueryTracher DAG instances", LOCK_METRICS);
+  }
 
   private final ScheduledExecutorService executorService;
 
   private final ConcurrentHashMap<QueryIdentifier, QueryInfo> queryInfoMap = new ConcurrentHashMap<>();
 
+  private final Configuration conf;
   private final String[] localDirsBase;
   private final FileSystem localFs;
   private final String clusterId;
@@ -91,7 +108,7 @@ public class QueryTracker extends AbstractService {
 
 
   private final Lock lock = new ReentrantLock();
-  private final ConcurrentMap<QueryIdentifier, ReentrantReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
+  private final ConcurrentMap<QueryIdentifier, ReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
 
   // Tracks various maps for dagCompletions. This is setup here since stateChange messages
   // may be processed by a thread which ends up executing before a task.
@@ -109,6 +126,8 @@ public class QueryTracker extends AbstractService {
     super("QueryTracker");
     this.localDirsBase = localDirsBase;
     this.clusterId = clusterId;
+    this.conf = conf;
+
     try {
       localFs = FileSystem.getLocal(conf);
     } catch (IOException e) {
@@ -380,16 +399,18 @@ public class QueryTracker extends AbstractService {
     }
   }
 
-  private ReentrantReadWriteLock getDagLockNoCreate(QueryIdentifier queryIdentifier) {
+  private ReadWriteLock getDagLockNoCreate(QueryIdentifier queryIdentifier) {
     return dagSpecificLocks.get(queryIdentifier);
   }
 
-  private ReentrantReadWriteLock getDagLock(QueryIdentifier queryIdentifier) {
+  private ReadWriteLock getDagLock(QueryIdentifier queryIdentifier) {
     lock.lock();
     try {
-      ReentrantReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier);
+      ReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier);
       if (dagLock == null) {
-        dagLock = new ReentrantReadWriteLock();
+        dagLock = ReadWriteLockMetrics.wrap(conf,
+                                            new ReentrantReadWriteLock(),
+                                            LOCK_METRICS);
         dagSpecificLocks.put(queryIdentifier, dagLock);
       }
       return dagLock;
@@ -477,7 +498,7 @@ public class QueryTracker extends AbstractService {
     @Override
     protected Void callInternal() {
       LOG.info("External cleanup callable for {}", queryIdentifier);
-      ReentrantReadWriteLock dagLock = getDagLockNoCreate(queryIdentifier);
+      ReadWriteLock dagLock = getDagLockNoCreate(queryIdentifier);
       if (dagLock == null) {
         if (LOG.isTraceEnabled()) {
           LOG.trace("null dagLock. No cleanup required at the moment for {}", queryIdString);
@@ -528,7 +549,7 @@ public class QueryTracker extends AbstractService {
 
   private void handleFragmentCompleteExternalQuery(QueryInfo queryInfo) {
     if (queryInfo.isExternalQuery()) {
-      ReentrantReadWriteLock dagLock = getDagLock(queryInfo.getQueryIdentifier());
+      ReadWriteLock dagLock = getDagLock(queryInfo.getQueryIdentifier());
       if (dagLock == null) {
         LOG.warn("Ignoring fragment completion for unknown query: {}",
             queryInfo.getQueryIdentifier());
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index b944fad..3c124f9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hive.llap.daemon.services.impl;
 
 import java.io.IOException;
@@ -22,7 +23,6 @@ import java.io.PrintWriter;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 
-import javax.management.MalformedObjectNameException;
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -36,7 +36,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
-import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl;
 import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
@@ -58,8 +57,8 @@ public class LlapWebServices extends AbstractService {
   static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods";
   static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin";
 
-  static final String REGISTRY_ATTRIBUTE="llap.registry";
-  static final String PARENT_ATTRIBUTE="llap.parent";
+  static final String REGISTRY_ATTRIBUTE = "llap.registry";
+  static final String PARENT_ATTRIBUTE = "llap.parent";
 
   private int port;
   private HttpServer http;
@@ -100,12 +99,16 @@ public class LlapWebServices extends AbstractService {
     builder.setContextAttribute(REGISTRY_ATTRIBUTE, registry);
     builder.setContextAttribute(PARENT_ATTRIBUTE, parent);
 
+    // make conf available to the locking stats servle
+    LlapLockingServlet.setConf(conf);
+
     try {
       this.http = builder.build();
       this.http.addServlet("status", "/status", LlapStatusServlet.class);
       this.http.addServlet("peers", "/peers", LlapPeerRegistryServlet.class);
       this.http.addServlet("iomem", "/iomem", LlapIoMemoryServlet.class);
       this.http.addServlet("system", "/system", SystemConfigurationServlet.class);
+      this.http.addServlet("locking", "/locking", LlapLockingServlet.class);
     } catch (IOException e) {
       LOG.warn("LLAP web service failed to come up", e);
     }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 2fffeb8..c63ee5f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -167,6 +167,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
         SerDeLowLevelCacheImpl serdeCacheImpl = new SerDeLowLevelCacheImpl(
             cacheMetrics, cachePolicyWrapper, allocator);
         serdeCache = serdeCacheImpl;
+        serdeCacheImpl.setConf(conf);
       }
 
       boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index a5671e9..462b25f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -777,7 +777,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
           throw new AssertionError("Caching data without an encoding at " + i + ": " + sd);
         }
       }
-      FileData fd = new FileData(fileKey, encodings.length);
+      FileData fd = new FileData(daemonConf, fileKey, encodings.length);
       fd.addStripe(sd);
       cache.putFileData(fd, Priority.NORMAL, counters, cacheTag);
     } else {
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 560cbaa..cdf767f 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -17,6 +17,8 @@ package org.apache.hadoop.hive.llap.tezplugins;
 import com.google.common.io.ByteArrayDataOutput;
 
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
@@ -59,6 +61,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -70,6 +73,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
 import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
+import org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
 import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
@@ -175,6 +179,19 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
+  /// Shared singleton MetricsSource instance for all FileData locks
+  private static final MetricsSource LOCK_METRICS;
+
+  static {
+    // create and register the MetricsSource for lock metrics
+    MetricsSystem ms = LlapMetricsSystem.instance();
+    LOCK_METRICS =
+        ReadWriteLockMetrics.createLockMetricsSource("TaskScheduler");
+
+    ms.register("LLAPTaskSchedulerLockMetrics",
+                "Lock metrics for R/W locks LLAP task scheduler", LOCK_METRICS);
+  }
+
   // TODO: this is an ugly hack; see the same in LlapTaskCommunicator for discussion.
   //       This only lives for the duration of the service init.
   static LlapTaskSchedulerService instance = null;
@@ -225,9 +242,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   @VisibleForTesting
   final DelayedTaskSchedulerCallable delayedTaskSchedulerCallable;
 
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
-  private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+  private final ReadWriteLock lock;
+  private final Lock readLock;
+  private final Lock writeLock;
 
   private final Lock scheduleLock = new ReentrantLock();
   private final Condition scheduleCondition = scheduleLock.newCondition();
@@ -321,6 +338,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           "Failed to parse user payload for " + LlapTaskSchedulerService.class.getSimpleName(), e);
     }
 
+    lock = ReadWriteLockMetrics.wrap(conf,
+                                     new ReentrantReadWriteLock(),
+                                    LOCK_METRICS);
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+
     if (conf.getBoolean(LLAP_PLUGIN_ENDPOINT_ENABLED, false)) {
       JobTokenSecretManager sm = null;
       if (UserGroupInformation.isSecurityEnabled()) {