You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ab...@apache.org on 2019/11/22 18:11:18 UTC
[hive] branch master updated: HIVE-22514: HiveProtoLoggingHook
might consume lots of memory (Attila Magyar via Slim Bouguerra)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new d6ae486 HIVE-22514: HiveProtoLoggingHook might consume lots of memory (Attila Magyar via Slim Bouguerra)
d6ae486 is described below
commit d6ae48693499bad765e236bba84ac8b76c013dc6
Author: Attila Magyar <am...@hortonworks.com>
AuthorDate: Fri Nov 22 18:45:59 2019 +0100
HIVE-22514: HiveProtoLoggingHook might consume lots of memory (Attila Magyar via Slim Bouguerra)
Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 2 ++
.../hadoop/hive/ql/hooks/HiveProtoLoggingHook.java | 21 +++++++++++++++------
.../hive/ql/hooks/TestHiveProtoLoggingHook.java | 22 ++++++++++++++++++++++
3 files changed, 39 insertions(+), 6 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a7687d5..bf03fe6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -675,6 +675,8 @@ public class HiveConf extends Configuration {
"Table alias will be added to column names for queries of type \"select *\" or \n" +
"if query explicitly uses table alias \"select r1.x..\"."),
+ HIVE_PROTO_EVENTS_QUEUE_CAPACITY("hive.hook.proto.queue.capacity", 64,
+ "Queue capacity for the proto events logging threads."),
HIVE_PROTO_EVENTS_BASE_PATH("hive.hook.proto.base-directory", "",
"Base directory into which the proto event messages are written by HiveProtoLoggingHook."),
HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL("hive.hook.proto.rollover-interval", "600s",
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index 8eab548..86a6800 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -92,9 +92,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -179,7 +178,8 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
private final Clock clock;
private final String logFileName;
private final DatePartitionedLogger<HiveHookEventProto> logger;
- private final ScheduledExecutorService logWriter;
+ private final ScheduledThreadPoolExecutor logWriter;
+ private final int queueCapacity;
private int logFileCount = 0;
private ProtoMessageWriter<HiveHookEventProto> writer;
private LocalDate writerDate;
@@ -189,6 +189,8 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
this.clock = clock;
// randomUUID is slow, since its cryptographically secure, only first query will take time.
this.logFileName = "hive_" + UUID.randomUUID().toString();
+ this.queueCapacity = conf.getInt(ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.varname,
+ ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.defaultIntVal);
String baseDir = conf.getVar(ConfVars.HIVE_PROTO_EVENTS_BASE_PATH);
if (StringUtils.isBlank(baseDir)) {
baseDir = null;
@@ -214,7 +216,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Hive Hook Proto Log Writer %d").build();
- logWriter = Executors.newSingleThreadScheduledExecutor(threadFactory);
+ logWriter = new ScheduledThreadPoolExecutor(1, threadFactory);
long rolloverInterval = conf.getTimeVar(
HiveConf.ConfVars.HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL, TimeUnit.MICROSECONDS);
@@ -267,10 +269,17 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
}
if (event != null) {
try {
- logWriter.execute(() -> writeEvent(event));
+ // ScheduledThreadPoolExecutor uses an unbounded queue which cannot be replaced with a bounded queue.
+ // Therefore checking queue capacity manually here.
+ if (logWriter.getQueue().size() < queueCapacity) {
+ logWriter.execute(() -> writeEvent(event));
+ } else {
+ LOG.warn("Writer queue full ignoring event {} for query {}",
+ hookContext.getHookType(), plan.getQueryId());
+ }
} catch (RejectedExecutionException e) {
LOG.warn("Writer queue full ignoring event {} for query {}",
- hookContext.getHookType(), plan.getQueryId());
+ hookContext.getHookType(), plan.getQueryId());
}
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
index 450a0b5..add4b68 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.hooks;
+import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -73,6 +74,7 @@ public class TestHiveProtoLoggingHook {
public void setup() throws Exception {
conf = new HiveConf();
conf.set(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, "llap_queue");
+ conf.set(HiveConf.ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.varname, "3");
conf.set(MRJobConfig.QUEUE_NAME, "mr_queue");
conf.set(TezConfiguration.TEZ_QUEUE_NAME, "tez_queue");
tmpFolder = folder.newFolder().getAbsolutePath();
@@ -164,6 +166,26 @@ public class TestHiveProtoLoggingHook {
Assert.assertEquals(event.getQueue(), "llap_queue");
}
+ @org.junit.Ignore("might fail intermittently")
+ @Test
+ public void testDropsEventWhenQueueIsFull() throws Exception {
+ EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance());
+ context.setHookType(HookType.PRE_EXEC_HOOK);
+ evtLogger.handle(context);
+ evtLogger.handle(context);
+ evtLogger.handle(context);
+ evtLogger.handle(context);
+ evtLogger.shutdown();
+ ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder);
+ reader.readEvent();
+ reader.readEvent();
+ reader.readEvent();
+ try {
+ reader.readEvent();
+ Assert.fail("Expected 3 events due to queue capacity limit, got 4.");
+ } catch (EOFException expected) {}
+ }
+
@Test
public void testPreAndPostEventBoth() throws Exception {
context.setHookType(HookType.PRE_EXEC_HOOK);