You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/10/26 21:12:19 UTC
[69/75] [abbrv] hive git commit: HIVE-20746: HiveProtoHookLogger does
not close file at end of day. (Harish JP, reviewd by Anishek Agarwal)
HIVE-20746: HiveProtoHookLogger does not close file at end of day. (Harish JP, reviewd by Anishek Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a99be34a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a99be34a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a99be34a
Branch: refs/heads/master-tez092
Commit: a99be34a0ef668babaae1ef0dfbf200e259bc907
Parents: 3cbc13e
Author: Anishek Agarwal <an...@gmail.com>
Authored: Thu Oct 25 11:18:59 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Thu Oct 25 11:18:59 2018 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 +-
.../hive/ql/hooks/HiveProtoLoggingHook.java | 68 ++++++++++++--------
.../hive/ql/hooks/TestHiveProtoLoggingHook.java | 37 +++++++++++
3 files changed, 82 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a99be34a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ed6d3d8..e226a1f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -643,8 +643,9 @@ public class HiveConf extends Configuration {
HIVE_PROTO_EVENTS_BASE_PATH("hive.hook.proto.base-directory", "",
"Base directory into which the proto event messages are written by HiveProtoLoggingHook."),
- HIVE_PROTO_EVENTS_QUEUE_CAPACITY("hive.hook.proto.queue.capacity", 64,
- "Queue capacity for the proto events logging threads."),
+ HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL("hive.hook.proto.rollover-interval", "600s",
+ new TimeValidator(TimeUnit.SECONDS, 0L, true, 3600 * 24L, true),
+ "Frequency at which the file rollover check is triggered."),
HIVE_PROTO_EVENTS_CLEAN_FREQ("hive.hook.proto.events.clean.freq", "1d",
new TimeValidator(TimeUnit.DAYS),
"Frequency at which timer task runs to purge expired proto event files."),
http://git-wip-us.apache.org/repos/asf/hive/blob/a99be34a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index 5a613b8..0a09675 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -93,11 +93,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -162,7 +161,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
.collect(Collectors.toSet());
}
- private static final int HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT = 64;
private static final int WAIT_TIME = 5;
public enum EventType {
@@ -182,7 +180,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
private final Clock clock;
private final String logFileName;
private final DatePartitionedLogger<HiveHookEventProto> logger;
- private final ExecutorService logWriter;
+ private final ScheduledExecutorService logWriter;
private int logFileCount = 0;
private ProtoMessageWriter<HiveHookEventProto> writer;
private LocalDate writerDate;
@@ -215,13 +213,14 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
return;
}
- int queueCapacity = conf.getInt(ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.varname,
- HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT);
-
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Hive Hook Proto Log Writer %d").build();
- logWriter = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory);
+ logWriter = Executors.newSingleThreadScheduledExecutor(threadFactory);
+
+ long rolloverInterval = conf.getTimeVar(
+ HiveConf.ConfVars.HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL, TimeUnit.MICROSECONDS);
+ logWriter.scheduleWithFixedDelay(() -> handleTick(), rolloverInterval, rolloverInterval,
+ TimeUnit.MICROSECONDS);
}
void shutdown() {
@@ -277,29 +276,45 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
}
}
+ private void handleTick() {
+ try {
+ maybeRolloverWriterForDay();
+ } catch (IOException e) {
+ LOG.error("Got IOException while trying to rollover: ", e);
+ }
+ }
+
+ private boolean maybeRolloverWriterForDay() throws IOException {
+ if (writer == null || !logger.getNow().toLocalDate().equals(writerDate)) {
+ if (writer != null) {
+ // Day change over case, reset the logFileCount.
+ logFileCount = 0;
+ IOUtils.closeQuietly(writer);
+ writer = null;
+ }
+ // increment log file count, if creating a new writer.
+ writer = logger.getWriter(logFileName + "_" + ++logFileCount);
+ writerDate = logger.getDateFromDir(writer.getPath().getParent().getName());
+ return true;
+ }
+ return false;
+ }
+
private static final int MAX_RETRIES = 2;
private void writeEvent(HiveHookEventProto event) {
for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) {
try {
- if (writer == null || !logger.getNow().toLocalDate().equals(writerDate)) {
- if (writer != null) {
- // Day change over case, reset the logFileCount.
- logFileCount = 0;
- IOUtils.closeQuietly(writer);
- }
- // increment log file count, if creating a new writer.
- writer = logger.getWriter(logFileName + "_" + ++logFileCount);
- writerDate = logger.getDateFromDir(writer.getPath().getParent().getName());
- }
- writer.writeProto(event);
if (eventPerFile) {
- if (writer != null) {
- LOG.debug("Event per file enabled. Closing proto event file: {}", writer.getPath());
- IOUtils.closeQuietly(writer);
+ LOG.debug("Event per file enabled. Closing proto event file: {}", writer.getPath());
+ if (!maybeRolloverWriterForDay()) {
+ writer = logger.getWriter(logFileName + "_" + ++logFileCount);
}
- // rollover to next file
- writer = logger.getWriter(logFileName + "_" + ++logFileCount);
+ writer.writeProto(event);
+ IOUtils.closeQuietly(writer);
+ writer = null;
} else {
+ maybeRolloverWriterForDay();
+ writer.writeProto(event);
writer.hflush();
}
return;
@@ -311,6 +326,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
LOG.warn("Error writing proto message for query {}, eventType: {}, retryCount: {}," +
" error: {} ", event.getHiveQueryId(), event.getEventType(), retryCount,
e.getMessage());
+ LOG.trace("Exception", e);
} else {
LOG.error("Error writing proto message for query {}, eventType: {}: ",
event.getHiveQueryId(), event.getEventType(), e);
http://git-wip-us.apache.org/repos/asf/hive/blob/a99be34a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
index a5939fa..450a0b5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
@@ -22,11 +22,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
@@ -224,6 +227,40 @@ public class TestHiveProtoLoggingHook {
assertOtherInfo(event, OtherInfoType.PERF, null);
}
+ @Test
+ public void testRolloverFiles() throws Exception {
+ long waitTime = 100;
+ context.setHookType(HookType.PRE_EXEC_HOOK);
+ conf.setTimeDuration(ConfVars.HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL.varname, waitTime,
+ TimeUnit.MICROSECONDS);
+ Path path = new Path(tmpFolder);
+ FileSystem fs = path.getFileSystem(conf);
+ AtomicLong time = new AtomicLong();
+ EventLogger evtLogger = new EventLogger(conf, () -> time.get());
+ evtLogger.handle(context);
+ int statusLen = 0;
+ // Loop to ensure that we give some grace for scheduling issues.
+ for (int i = 0; i < 3; ++i) {
+ Thread.sleep(waitTime + 100);
+ statusLen = fs.listStatus(path).length;
+ if (statusLen > 0) {
+ break;
+ }
+ }
+ Assert.assertEquals(1, statusLen);
+
+ // Move to next day and ensure a new file gets created.
+ time.set(24 * 60 * 60 * 1000 + 1000);
+ for (int i = 0; i < 3; ++i) {
+ Thread.sleep(waitTime + 100);
+ statusLen = fs.listStatus(path).length;
+ if (statusLen > 1) {
+ break;
+ }
+ }
+ Assert.assertEquals(2, statusLen);
+ }
+
private ProtoMessageReader<HiveHookEventProto> getTestReader(HiveConf conf, String tmpFolder)
throws IOException {
Path path = new Path(tmpFolder);