You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/10/26 21:12:19 UTC

[69/75] [abbrv] hive git commit: HIVE-20746: HiveProtoHookLogger does not close file at end of day. (Harish JP, reviewd by Anishek Agarwal)

HIVE-20746: HiveProtoHookLogger does not close file at end of day. (Harish JP, reviewd by Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a99be34a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a99be34a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a99be34a

Branch: refs/heads/master-tez092
Commit: a99be34a0ef668babaae1ef0dfbf200e259bc907
Parents: 3cbc13e
Author: Anishek Agarwal <an...@gmail.com>
Authored: Thu Oct 25 11:18:59 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Thu Oct 25 11:18:59 2018 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  5 +-
 .../hive/ql/hooks/HiveProtoLoggingHook.java     | 68 ++++++++++++--------
 .../hive/ql/hooks/TestHiveProtoLoggingHook.java | 37 +++++++++++
 3 files changed, 82 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a99be34a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ed6d3d8..e226a1f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -643,8 +643,9 @@ public class HiveConf extends Configuration {
 
     HIVE_PROTO_EVENTS_BASE_PATH("hive.hook.proto.base-directory", "",
             "Base directory into which the proto event messages are written by HiveProtoLoggingHook."),
-    HIVE_PROTO_EVENTS_QUEUE_CAPACITY("hive.hook.proto.queue.capacity", 64,
-            "Queue capacity for the proto events logging threads."),
+    HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL("hive.hook.proto.rollover-interval", "600s",
+            new TimeValidator(TimeUnit.SECONDS, 0L, true, 3600 * 24L, true),
+            "Frequency at which the file rollover check is triggered."),
     HIVE_PROTO_EVENTS_CLEAN_FREQ("hive.hook.proto.events.clean.freq", "1d",
             new TimeValidator(TimeUnit.DAYS),
             "Frequency at which timer task runs to purge expired proto event files."),

http://git-wip-us.apache.org/repos/asf/hive/blob/a99be34a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index 5a613b8..0a09675 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -93,11 +93,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -162,7 +161,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
             .collect(Collectors.toSet());
   }
 
-  private static final int HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT = 64;
   private static final int WAIT_TIME = 5;
 
   public enum EventType {
@@ -182,7 +180,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
     private final Clock clock;
     private final String logFileName;
     private final DatePartitionedLogger<HiveHookEventProto> logger;
-    private final ExecutorService logWriter;
+    private final ScheduledExecutorService logWriter;
     private int logFileCount = 0;
     private ProtoMessageWriter<HiveHookEventProto> writer;
     private LocalDate writerDate;
@@ -215,13 +213,14 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
         return;
       }
 
-      int queueCapacity = conf.getInt(ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.varname,
-          HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT);
-
       ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
           .setNameFormat("Hive Hook Proto Log Writer %d").build();
-      logWriter = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
-          new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory);
+      logWriter = Executors.newSingleThreadScheduledExecutor(threadFactory);
+
+      long rolloverInterval = conf.getTimeVar(
+          HiveConf.ConfVars.HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL, TimeUnit.MICROSECONDS);
+      logWriter.scheduleWithFixedDelay(() -> handleTick(), rolloverInterval, rolloverInterval,
+          TimeUnit.MICROSECONDS);
     }
 
     void shutdown() {
@@ -277,29 +276,45 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       }
     }
 
+    private void handleTick() {
+      try {
+        maybeRolloverWriterForDay();
+      } catch (IOException e) {
+        LOG.error("Got IOException while trying to rollover: ", e);
+      }
+    }
+
+    private boolean maybeRolloverWriterForDay() throws IOException {
+      if (writer == null || !logger.getNow().toLocalDate().equals(writerDate)) {
+        if (writer != null) {
+          // Day change over case, reset the logFileCount.
+          logFileCount = 0;
+          IOUtils.closeQuietly(writer);
+          writer = null;
+        }
+        // increment log file count, if creating a new writer.
+        writer = logger.getWriter(logFileName + "_" + ++logFileCount);
+        writerDate = logger.getDateFromDir(writer.getPath().getParent().getName());
+        return true;
+      }
+      return false;
+    }
+
     private static final int MAX_RETRIES = 2;
     private void writeEvent(HiveHookEventProto event) {
       for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) {
         try {
-          if (writer == null || !logger.getNow().toLocalDate().equals(writerDate)) {
-            if (writer != null) {
-              // Day change over case, reset the logFileCount.
-              logFileCount = 0;
-              IOUtils.closeQuietly(writer);
-            }
-            // increment log file count, if creating a new writer.
-            writer = logger.getWriter(logFileName + "_" + ++logFileCount);
-            writerDate = logger.getDateFromDir(writer.getPath().getParent().getName());
-          }
-          writer.writeProto(event);
           if (eventPerFile) {
-            if (writer != null) {
-              LOG.debug("Event per file enabled. Closing proto event file: {}", writer.getPath());
-              IOUtils.closeQuietly(writer);
+            LOG.debug("Event per file enabled. Closing proto event file: {}", writer.getPath());
+            if (!maybeRolloverWriterForDay()) {
+              writer = logger.getWriter(logFileName + "_" + ++logFileCount);
             }
-            // rollover to next file
-            writer = logger.getWriter(logFileName + "_" + ++logFileCount);
+            writer.writeProto(event);
+            IOUtils.closeQuietly(writer);
+            writer = null;
           } else {
+            maybeRolloverWriterForDay();
+            writer.writeProto(event);
             writer.hflush();
           }
           return;
@@ -311,6 +326,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
             LOG.warn("Error writing proto message for query {}, eventType: {}, retryCount: {}," +
                 " error: {} ", event.getHiveQueryId(), event.getEventType(), retryCount,
                 e.getMessage());
+            LOG.trace("Exception", e);
           } else {
             LOG.error("Error writing proto message for query {}, eventType: {}: ",
                 event.getHiveQueryId(), event.getEventType(), e);

http://git-wip-us.apache.org/repos/asf/hive/blob/a99be34a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
index a5939fa..450a0b5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
@@ -22,11 +22,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
@@ -224,6 +227,40 @@ public class TestHiveProtoLoggingHook {
     assertOtherInfo(event, OtherInfoType.PERF, null);
   }
 
+  @Test
+  public void testRolloverFiles() throws Exception {
+    long waitTime = 100;
+    context.setHookType(HookType.PRE_EXEC_HOOK);
+    conf.setTimeDuration(ConfVars.HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL.varname, waitTime,
+        TimeUnit.MICROSECONDS);
+    Path path = new Path(tmpFolder);
+    FileSystem fs = path.getFileSystem(conf);
+    AtomicLong time = new AtomicLong();
+    EventLogger evtLogger = new EventLogger(conf, () -> time.get());
+    evtLogger.handle(context);
+    int statusLen = 0;
+    // Loop to ensure that we give some grace for scheduling issues.
+    for (int i = 0; i < 3; ++i) {
+      Thread.sleep(waitTime + 100);
+      statusLen = fs.listStatus(path).length;
+      if (statusLen > 0) {
+        break;
+      }
+    }
+    Assert.assertEquals(1, statusLen);
+
+    // Move to next day and ensure a new file gets created.
+    time.set(24 * 60 * 60 * 1000 + 1000);
+    for (int i = 0; i < 3; ++i) {
+      Thread.sleep(waitTime + 100);
+      statusLen = fs.listStatus(path).length;
+      if (statusLen > 1) {
+        break;
+      }
+    }
+    Assert.assertEquals(2, statusLen);
+  }
+
   private ProtoMessageReader<HiveHookEventProto> getTestReader(HiveConf conf, String tmpFolder)
       throws IOException {
     Path path = new Path(tmpFolder);