You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/02 02:07:50 UTC

[02/20] hive git commit: HIVE-20011: Move away from append mode in proto logging hook (Harish JP, reviewd by Anishek Agarwal)

HIVE-20011: Move away from append mode in proto logging hook (Harish JP, reviewd by Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2b0cb07e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2b0cb07e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2b0cb07e

Branch: refs/heads/master-txnstats
Commit: 2b0cb07e4f549e64ca776024caf9cf0b5ba01176
Parents: 5b2cbb5
Author: Anishek Agarwal <an...@gmail.com>
Authored: Fri Jun 29 15:05:17 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Fri Jun 29 15:05:17 2018 +0530

----------------------------------------------------------------------
 .../hive/ql/hooks/HiveProtoLoggingHook.java     | 24 +++++++++++++++++---
 .../logging/proto/DatePartitionedLogger.java    | 18 +++++++++++----
 .../logging/proto/ProtoMessageReader.java       |  9 +++++---
 .../logging/proto/ProtoMessageWriter.java       | 12 +++++-----
 4 files changed, 46 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2b0cb07e/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index bddca1a..0820bea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -86,6 +86,7 @@ import static org.apache.hadoop.hive.ql.plan.HiveOperation.UNLOCKTABLE;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -100,6 +101,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.commons.compress.utils.IOUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
@@ -180,6 +182,9 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
     private final DatePartitionedLogger<HiveHookEventProto> logger;
     private final ExecutorService eventHandler;
     private final ExecutorService logWriter;
+    private int logFileCount = 0;
+    private ProtoMessageWriter<HiveHookEventProto> writer;
+    private LocalDate writerDate;
 
     EventLogger(HiveConf conf, Clock clock) {
       this.clock = clock;
@@ -233,6 +238,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
           LOG.warn("Got interrupted exception while waiting for events to be flushed", e);
         }
       }
+      IOUtils.closeQuietly(writer);
     }
 
     void handle(HookContext hookContext) {
@@ -284,12 +290,24 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
     private static final int MAX_RETRIES = 2;
     private void writeEvent(HiveHookEventProto event) {
       for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) {
-        try (ProtoMessageWriter<HiveHookEventProto> writer = logger.getWriter(logFileName)) {
+        try {
+          if (writer == null || !logger.getNow().toLocalDate().equals(writerDate)) {
+            if (writer != null) {
+              // Day change over case, reset the logFileCount.
+              logFileCount = 0;
+              IOUtils.closeQuietly(writer);
+            }
+            // increment log file count, if creating a new writer.
+            writer = logger.getWriter(logFileName + "_" + ++logFileCount);
+            writerDate = logger.getDateFromDir(writer.getPath().getParent().getName());
+          }
           writer.writeProto(event);
-          // This does not work hence, opening and closing file for every event.
-          // writer.hflush();
+          writer.hflush();
           return;
         } catch (IOException e) {
+          // Something wrong with writer, lets close and reopen.
+          IOUtils.closeQuietly(writer);
+          writer = null;
           if (retryCount < MAX_RETRIES) {
             LOG.warn("Error writing proto message for query {}, eventType: {}, retryCount: {}," +
                 " error: {} ", event.getHiveQueryId(), event.getEventType(), retryCount,

http://git-wip-us.apache.org/repos/asf/hive/blob/2b0cb07e/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
index d6a5121..58cec7e 100644
--- a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
@@ -45,11 +45,14 @@ import com.google.protobuf.Parser;
  * @param <T> The proto message type.
  */
 public class DatePartitionedLogger<T extends MessageLite> {
-  private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class.getName());
+  private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class);
   // Everyone has permission to write, but with sticky set so that delete is restricted.
   // This is required, since the path is same for all users and everyone writes into it.
   private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777);
 
+  // Since the directories have broad permissions restrict the file read access.
+  private static final FsPermission FILE_UMASK = FsPermission.createImmutable((short)0066);
+
   private final Parser<T> parser;
   private final Path basePath;
   private final Configuration conf;
@@ -57,11 +60,12 @@ public class DatePartitionedLogger<T extends MessageLite> {
 
   public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock)
       throws IOException {
-    this.conf = conf;
+    this.conf = new Configuration(conf);
     this.clock = clock;
     this.parser = parser;
     createDirIfNotExists(baseDir);
     this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir);
+    FsPermission.setUMask(this.conf, FILE_UMASK);
   }
 
   private void createDirIfNotExists(Path path) throws IOException {
@@ -101,6 +105,10 @@ public class DatePartitionedLogger<T extends MessageLite> {
     return new Path(path, fileName);
   }
 
+  public Path getPathForSubdir(String dirName, String fileName) {
+    return new Path(new Path(basePath, dirName), fileName);
+  }
+
   /**
    * Extract the date from the directory name, this should be a directory created by this class.
    */
@@ -144,11 +152,11 @@ public class DatePartitionedLogger<T extends MessageLite> {
    * Returns new or changed files in the given directory. The offsets are used to find
    * changed files.
    */
-  public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets)
+  public List<FileStatus> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets)
       throws IOException {
     Path dirPath = new Path(basePath, subDir);
     FileSystem fileSystem = basePath.getFileSystem(conf);
-    List<Path> newFiles = new ArrayList<>();
+    List<FileStatus> newFiles = new ArrayList<>();
     if (!fileSystem.exists(dirPath)) {
       return newFiles;
     }
@@ -157,7 +165,7 @@ public class DatePartitionedLogger<T extends MessageLite> {
       Long offset = currentOffsets.get(fileName);
       // If the offset was never added or offset < fileSize.
       if (offset == null || offset < status.getLen()) {
-        newFiles.add(new Path(dirPath, fileName));
+        newFiles.add(status);
       }
     }
     return newFiles;

http://git-wip-us.apache.org/repos/asf/hive/blob/2b0cb07e/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
index 5a3c63a..b56f066 100644
--- a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
@@ -24,19 +24,22 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
 
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
 public class ProtoMessageReader<T extends MessageLite> implements Closeable {
   private final Path filePath;
-  private final SequenceFile.Reader reader;
+  private final Reader reader;
   private final ProtoMessageWritable<T> writable;
 
   ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
     this.filePath = filePath;
-    this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath));
+    // The writer does not flush the length during hflush. Using length options lets us read
+    // past length in the FileStatus but it will throw EOFException during a read instead
+    // of returning null.
+    this.reader = new Reader(conf, Reader.file(filePath), Reader.length(Long.MAX_VALUE));
     this.writable = new ProtoMessageWritable<>(parser);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2b0cb07e/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
index c746bb6..9c086ef 100644
--- a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
@@ -26,24 +26,24 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Writer;
 
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
 public class ProtoMessageWriter<T extends MessageLite> implements Closeable {
   private final Path filePath;
-  private final SequenceFile.Writer writer;
+  private final Writer writer;
   private final ProtoMessageWritable<T> writable;
 
   ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
     this.filePath = filePath;
     this.writer = SequenceFile.createWriter(
         conf,
-        SequenceFile.Writer.file(filePath),
-        SequenceFile.Writer.keyClass(NullWritable.class),
-        SequenceFile.Writer.valueClass(ProtoMessageWritable.class),
-        SequenceFile.Writer.appendIfExists(true),
-        SequenceFile.Writer.compression(CompressionType.RECORD));
+        Writer.file(filePath),
+        Writer.keyClass(NullWritable.class),
+        Writer.valueClass(ProtoMessageWritable.class),
+        Writer.compression(CompressionType.RECORD));
     this.writable = new ProtoMessageWritable<>(parser);
   }