You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/02 02:07:50 UTC
[02/20] hive git commit: HIVE-20011: Move away from append mode in
proto logging hook (Harish JP, reviewd by Anishek Agarwal)
HIVE-20011: Move away from append mode in proto logging hook (Harish JP, reviewd by Anishek Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2b0cb07e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2b0cb07e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2b0cb07e
Branch: refs/heads/master-txnstats
Commit: 2b0cb07e4f549e64ca776024caf9cf0b5ba01176
Parents: 5b2cbb5
Author: Anishek Agarwal <an...@gmail.com>
Authored: Fri Jun 29 15:05:17 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Fri Jun 29 15:05:17 2018 +0530
----------------------------------------------------------------------
.../hive/ql/hooks/HiveProtoLoggingHook.java | 24 +++++++++++++++++---
.../logging/proto/DatePartitionedLogger.java | 18 +++++++++++----
.../logging/proto/ProtoMessageReader.java | 9 +++++---
.../logging/proto/ProtoMessageWriter.java | 12 +++++-----
4 files changed, 46 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2b0cb07e/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index bddca1a..0820bea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -86,6 +86,7 @@ import static org.apache.hadoop.hive.ql.plan.HiveOperation.UNLOCKTABLE;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -100,6 +101,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
@@ -180,6 +182,9 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
private final DatePartitionedLogger<HiveHookEventProto> logger;
private final ExecutorService eventHandler;
private final ExecutorService logWriter;
+ private int logFileCount = 0;
+ private ProtoMessageWriter<HiveHookEventProto> writer;
+ private LocalDate writerDate;
EventLogger(HiveConf conf, Clock clock) {
this.clock = clock;
@@ -233,6 +238,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
LOG.warn("Got interrupted exception while waiting for events to be flushed", e);
}
}
+ IOUtils.closeQuietly(writer);
}
void handle(HookContext hookContext) {
@@ -284,12 +290,24 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
private static final int MAX_RETRIES = 2;
private void writeEvent(HiveHookEventProto event) {
for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) {
- try (ProtoMessageWriter<HiveHookEventProto> writer = logger.getWriter(logFileName)) {
+ try {
+ if (writer == null || !logger.getNow().toLocalDate().equals(writerDate)) {
+ if (writer != null) {
+ // Day change over case, reset the logFileCount.
+ logFileCount = 0;
+ IOUtils.closeQuietly(writer);
+ }
+ // increment log file count, if creating a new writer.
+ writer = logger.getWriter(logFileName + "_" + ++logFileCount);
+ writerDate = logger.getDateFromDir(writer.getPath().getParent().getName());
+ }
writer.writeProto(event);
- // This does not work hence, opening and closing file for every event.
- // writer.hflush();
+ writer.hflush();
return;
} catch (IOException e) {
+ // Something wrong with writer, lets close and reopen.
+ IOUtils.closeQuietly(writer);
+ writer = null;
if (retryCount < MAX_RETRIES) {
LOG.warn("Error writing proto message for query {}, eventType: {}, retryCount: {}," +
" error: {} ", event.getHiveQueryId(), event.getEventType(), retryCount,
http://git-wip-us.apache.org/repos/asf/hive/blob/2b0cb07e/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
index d6a5121..58cec7e 100644
--- a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
@@ -45,11 +45,14 @@ import com.google.protobuf.Parser;
* @param <T> The proto message type.
*/
public class DatePartitionedLogger<T extends MessageLite> {
- private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class);
// Everyone has permission to write, but with sticky set so that delete is restricted.
// This is required, since the path is same for all users and everyone writes into it.
private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777);
+ // Since the directories have broad permissions restrict the file read access.
+ private static final FsPermission FILE_UMASK = FsPermission.createImmutable((short)0066);
+
private final Parser<T> parser;
private final Path basePath;
private final Configuration conf;
@@ -57,11 +60,12 @@ public class DatePartitionedLogger<T extends MessageLite> {
public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock)
throws IOException {
- this.conf = conf;
+ this.conf = new Configuration(conf);
this.clock = clock;
this.parser = parser;
createDirIfNotExists(baseDir);
this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir);
+ FsPermission.setUMask(this.conf, FILE_UMASK);
}
private void createDirIfNotExists(Path path) throws IOException {
@@ -101,6 +105,10 @@ public class DatePartitionedLogger<T extends MessageLite> {
return new Path(path, fileName);
}
+ public Path getPathForSubdir(String dirName, String fileName) {
+ return new Path(new Path(basePath, dirName), fileName);
+ }
+
/**
* Extract the date from the directory name, this should be a directory created by this class.
*/
@@ -144,11 +152,11 @@ public class DatePartitionedLogger<T extends MessageLite> {
* Returns new or changed files in the given directory. The offsets are used to find
* changed files.
*/
- public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets)
+ public List<FileStatus> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets)
throws IOException {
Path dirPath = new Path(basePath, subDir);
FileSystem fileSystem = basePath.getFileSystem(conf);
- List<Path> newFiles = new ArrayList<>();
+ List<FileStatus> newFiles = new ArrayList<>();
if (!fileSystem.exists(dirPath)) {
return newFiles;
}
@@ -157,7 +165,7 @@ public class DatePartitionedLogger<T extends MessageLite> {
Long offset = currentOffsets.get(fileName);
// If the offset was never added or offset < fileSize.
if (offset == null || offset < status.getLen()) {
- newFiles.add(new Path(dirPath, fileName));
+ newFiles.add(status);
}
}
return newFiles;
http://git-wip-us.apache.org/repos/asf/hive/blob/2b0cb07e/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
index 5a3c63a..b56f066 100644
--- a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
@@ -24,19 +24,22 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
public class ProtoMessageReader<T extends MessageLite> implements Closeable {
private final Path filePath;
- private final SequenceFile.Reader reader;
+ private final Reader reader;
private final ProtoMessageWritable<T> writable;
ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
this.filePath = filePath;
- this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath));
+ // The writer does not flush the length during hflush. Using length options lets us read
+ // past length in the FileStatus but it will throw EOFException during a read instead
+ // of returning null.
+ this.reader = new Reader(conf, Reader.file(filePath), Reader.length(Long.MAX_VALUE));
this.writable = new ProtoMessageWritable<>(parser);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2b0cb07e/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
index c746bb6..9c086ef 100644
--- a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
@@ -26,24 +26,24 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Writer;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
public class ProtoMessageWriter<T extends MessageLite> implements Closeable {
private final Path filePath;
- private final SequenceFile.Writer writer;
+ private final Writer writer;
private final ProtoMessageWritable<T> writable;
ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
this.filePath = filePath;
this.writer = SequenceFile.createWriter(
conf,
- SequenceFile.Writer.file(filePath),
- SequenceFile.Writer.keyClass(NullWritable.class),
- SequenceFile.Writer.valueClass(ProtoMessageWritable.class),
- SequenceFile.Writer.appendIfExists(true),
- SequenceFile.Writer.compression(CompressionType.RECORD));
+ Writer.file(filePath),
+ Writer.keyClass(NullWritable.class),
+ Writer.valueClass(ProtoMessageWritable.class),
+ Writer.compression(CompressionType.RECORD));
this.writable = new ProtoMessageWritable<>(parser);
}