You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ab...@apache.org on 2019/11/22 18:11:18 UTC

[hive] branch master updated: HIVE-22514: HiveProtoLoggingHook might consume lots of memory (Attila Magyar via Slim Bouguerra)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new d6ae486  HIVE-22514: HiveProtoLoggingHook might consume lots of memory (Attila Magyar via Slim Bouguerra)
d6ae486 is described below

commit d6ae48693499bad765e236bba84ac8b76c013dc6
Author: Attila Magyar <am...@hortonworks.com>
AuthorDate: Fri Nov 22 18:45:59 2019 +0100

    HIVE-22514: HiveProtoLoggingHook might consume lots of memory (Attila Magyar via Slim Bouguerra)
    
    Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  2 ++
 .../hadoop/hive/ql/hooks/HiveProtoLoggingHook.java | 21 +++++++++++++++------
 .../hive/ql/hooks/TestHiveProtoLoggingHook.java    | 22 ++++++++++++++++++++++
 3 files changed, 39 insertions(+), 6 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a7687d5..bf03fe6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -675,6 +675,8 @@ public class HiveConf extends Configuration {
         "Table alias will be added to column names for queries of type \"select *\" or \n" +
         "if query explicitly uses table alias \"select r1.x..\"."),
 
+    HIVE_PROTO_EVENTS_QUEUE_CAPACITY("hive.hook.proto.queue.capacity", 64,
+      "Queue capacity for the proto events logging threads."),
     HIVE_PROTO_EVENTS_BASE_PATH("hive.hook.proto.base-directory", "",
             "Base directory into which the proto event messages are written by HiveProtoLoggingHook."),
     HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL("hive.hook.proto.rollover-interval", "600s",
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index 8eab548..86a6800 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -92,9 +92,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -179,7 +178,8 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
     private final Clock clock;
     private final String logFileName;
     private final DatePartitionedLogger<HiveHookEventProto> logger;
-    private final ScheduledExecutorService logWriter;
+    private final ScheduledThreadPoolExecutor logWriter;
+    private final int queueCapacity;
     private int logFileCount = 0;
     private ProtoMessageWriter<HiveHookEventProto> writer;
     private LocalDate writerDate;
@@ -189,6 +189,8 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       this.clock = clock;
       // randomUUID is slow, since its cryptographically secure, only first query will take time.
       this.logFileName = "hive_" + UUID.randomUUID().toString();
+      this.queueCapacity = conf.getInt(ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.varname,
+        ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.defaultIntVal);
       String baseDir = conf.getVar(ConfVars.HIVE_PROTO_EVENTS_BASE_PATH);
       if (StringUtils.isBlank(baseDir)) {
         baseDir = null;
@@ -214,7 +216,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
 
       ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
           .setNameFormat("Hive Hook Proto Log Writer %d").build();
-      logWriter = Executors.newSingleThreadScheduledExecutor(threadFactory);
+      logWriter = new ScheduledThreadPoolExecutor(1, threadFactory);
 
       long rolloverInterval = conf.getTimeVar(
           HiveConf.ConfVars.HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL, TimeUnit.MICROSECONDS);
@@ -267,10 +269,17 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       }
       if (event != null) {
         try {
-          logWriter.execute(() -> writeEvent(event));
+          // ScheduledThreadPoolExecutor uses an unbounded queue which cannot be replaced with a bounded queue.
+          // Therefore checking queue capacity manually here.
+          if (logWriter.getQueue().size() < queueCapacity) {
+            logWriter.execute(() -> writeEvent(event));
+          } else {
+            LOG.warn("Writer queue full ignoring event {} for query {}",
+              hookContext.getHookType(), plan.getQueryId());
+          }
         } catch (RejectedExecutionException e) {
           LOG.warn("Writer queue full ignoring event {} for query {}",
-              hookContext.getHookType(), plan.getQueryId());
+            hookContext.getHookType(), plan.getQueryId());
         }
       }
     }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
index 450a0b5..add4b68 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.hooks;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -73,6 +74,7 @@ public class TestHiveProtoLoggingHook {
   public void setup() throws Exception {
     conf = new HiveConf();
     conf.set(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, "llap_queue");
+    conf.set(HiveConf.ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.varname, "3");
     conf.set(MRJobConfig.QUEUE_NAME, "mr_queue");
     conf.set(TezConfiguration.TEZ_QUEUE_NAME, "tez_queue");
     tmpFolder = folder.newFolder().getAbsolutePath();
@@ -164,6 +166,26 @@ public class TestHiveProtoLoggingHook {
     Assert.assertEquals(event.getQueue(), "llap_queue");
   }
 
+  @org.junit.Ignore("might fail intermittently")
+  @Test
+  public void testDropsEventWhenQueueIsFull() throws Exception {
+    EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance());
+    context.setHookType(HookType.PRE_EXEC_HOOK);
+    evtLogger.handle(context);
+    evtLogger.handle(context);
+    evtLogger.handle(context);
+    evtLogger.handle(context);
+    evtLogger.shutdown();
+    ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder);
+    reader.readEvent();
+    reader.readEvent();
+    reader.readEvent();
+    try {
+      reader.readEvent();
+      Assert.fail("Expected 3 events due to queue capacity limit, got 4.");
+    } catch (EOFException expected) {}
+  }
+
   @Test
   public void testPreAndPostEventBoth() throws Exception {
     context.setHookType(HookType.PRE_EXEC_HOOK);