You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2018/06/14 11:11:01 UTC
hive git commit: HIVE-19646: Filesystem closed error in
HiveProtoLoggingHook (Harish JP, reviewd by Anishek Agarwal)
Repository: hive
Updated Branches:
refs/heads/branch-3 abfbd1af3 -> 55bc28540
HIVE-19646: Filesystem closed error in HiveProtoLoggingHook (Harish JP, reviewd by Anishek Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/55bc2854
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/55bc2854
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/55bc2854
Branch: refs/heads/branch-3
Commit: 55bc2854096f7666244a23e1d769a90fcda0863d
Parents: abfbd1a
Author: Anishek Agarwal <an...@gmail.com>
Authored: Thu Jun 14 16:40:48 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Thu Jun 14 16:40:48 2018 +0530
----------------------------------------------------------------------
.../hive/ql/hooks/DatePartitionedLogger.java | 167 -----------------
.../hive/ql/hooks/HiveProtoLoggingHook.java | 32 +++-
.../hive/ql/hooks/ProtoMessageReader.java | 66 -------
.../hive/ql/hooks/ProtoMessageWritable.java | 101 -----------
.../hive/ql/hooks/ProtoMessageWriter.java | 71 --------
.../logging/proto/DatePartitionedLogger.java | 177 +++++++++++++++++++
.../logging/proto/ProtoMessageReader.java | 66 +++++++
.../logging/proto/ProtoMessageWritable.java | 101 +++++++++++
.../logging/proto/ProtoMessageWriter.java | 71 ++++++++
.../dag/history/logging/proto/package-info.java | 23 +++
.../hive/ql/hooks/TestHiveProtoLoggingHook.java | 2 +
11 files changed, 465 insertions(+), 412 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java
deleted file mode 100644
index c9d1b93..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.hooks;
-
-import java.io.IOException;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.yarn.util.Clock;
-
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-/**
- * Class to create proto reader and writer for a date partitioned directory structure.
- *
- * @param <T> The proto message type.
- */
-public class DatePartitionedLogger<T extends MessageLite> {
- // Everyone has permission to write, but with sticky set so that delete is restricted.
- // This is required, since the path is same for all users and everyone writes into it.
- private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777);
-
- private final Parser<T> parser;
- private final Path basePath;
- private final Configuration conf;
- private final Clock clock;
- private final FileSystem fileSystem;
-
- public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock)
- throws IOException {
- this.conf = conf;
- this.clock = clock;
- this.parser = parser;
- this.fileSystem = baseDir.getFileSystem(conf);
- if (!fileSystem.exists(baseDir)) {
- fileSystem.mkdirs(baseDir);
- fileSystem.setPermission(baseDir, DIR_PERMISSION);
- }
- this.basePath = fileSystem.resolvePath(baseDir);
- }
-
- /**
- * Creates a writer for the given fileName, with date as today.
- */
- public ProtoMessageWriter<T> getWriter(String fileName) throws IOException {
- Path filePath = getPathForDate(getNow().toLocalDate(), fileName);
- return new ProtoMessageWriter<>(conf, filePath, parser);
- }
-
- /**
- * Creates a reader for the given filePath, no validation is done.
- */
- public ProtoMessageReader<T> getReader(Path filePath) throws IOException {
- return new ProtoMessageReader<>(conf, filePath, parser);
- }
-
- /**
- * Create a path for the given date and fileName. This can be used to create a reader.
- */
- public Path getPathForDate(LocalDate date, String fileName) throws IOException {
- Path path = new Path(basePath, getDirForDate(date));
- if (!fileSystem.exists(path)) {
- fileSystem.mkdirs(path);
- fileSystem.setPermission(path, DIR_PERMISSION);
- }
- return new Path(path, fileName);
- }
-
- /**
- * Extract the date from the directory name, this should be a directory created by this class.
- */
- public LocalDate getDateFromDir(String dirName) {
- if (!dirName.startsWith("date=")) {
- throw new IllegalArgumentException("Invalid directory: "+ dirName);
- }
- return LocalDate.parse(dirName.substring(5), DateTimeFormatter.ISO_LOCAL_DATE);
- }
-
- /**
- * Returns the directory name for a given date.
- */
- public String getDirForDate(LocalDate date) {
- return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date);
- }
-
- /**
- * Find next available directory, after the given directory.
- */
- public String getNextDirectory(String currentDir) throws IOException {
- // Fast check, if the next day directory exists return it.
- String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1));
- if (fileSystem.exists(new Path(basePath, nextDate))) {
- return nextDate;
- }
- // Have to scan the directory to find min date greater than currentDir.
- String dirName = null;
- for (FileStatus status : fileSystem.listStatus(basePath)) {
- String name = status.getPath().getName();
- // String comparison is good enough, since its of form date=yyyy-MM-dd
- if (name.compareTo(currentDir) > 0 && (dirName == null || name.compareTo(dirName) < 0)) {
- dirName = name;
- }
- }
- return dirName;
- }
-
- /**
- * Returns new or changed files in the given directory. The offsets are used to find
- * changed files.
- */
- public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets)
- throws IOException {
- Path dirPath = new Path(basePath, subDir);
- List<Path> newFiles = new ArrayList<>();
- if (!fileSystem.exists(dirPath)) {
- return newFiles;
- }
- for (FileStatus status : fileSystem.listStatus(dirPath)) {
- String fileName = status.getPath().getName();
- Long offset = currentOffsets.get(fileName);
- // If the offset was never added or offset < fileSize.
- if (offset == null || offset < status.getLen()) {
- newFiles.add(new Path(dirPath, fileName));
- }
- }
- return newFiles;
- }
-
- /**
- * Returns the current time, using the underlying clock in UTC time.
- */
- public LocalDateTime getNow() {
- // Use UTC date to ensure reader date is same on all timezones.
- return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC);
- }
-
- public Configuration getConfig() {
- return conf;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index 1ae8194..eef6ac9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -118,6 +118,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger;
+import org.apache.tez.dag.history.logging.proto.ProtoMessageWriter;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -279,14 +281,30 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
}
}
+ private static final int MAX_RETRIES = 2;
private void writeEvent(HiveHookEventProto event) {
- try (ProtoMessageWriter<HiveHookEventProto> writer = logger.getWriter(logFileName)) {
- writer.writeProto(event);
- // This does not work hence, opening and closing file for every event.
- // writer.hflush();
- } catch (IOException e) {
- LOG.error("Error writing proto message for query {}, eventType: {}: ",
- event.getHiveQueryId(), event.getEventType(), e);
+ for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) {
+ try (ProtoMessageWriter<HiveHookEventProto> writer = logger.getWriter(logFileName)) {
+ writer.writeProto(event);
+ // This does not work hence, opening and closing file for every event.
+ // writer.hflush();
+ return;
+ } catch (IOException e) {
+ if (retryCount < MAX_RETRIES) {
+ LOG.warn("Error writing proto message for query {}, eventType: {}, retryCount: {}," +
+ " error: {} ", event.getHiveQueryId(), event.getEventType(), retryCount,
+ e.getMessage());
+ } else {
+ LOG.error("Error writing proto message for query {}, eventType: {}: ",
+ event.getHiveQueryId(), event.getEventType(), e);
+ }
+ try {
+ // 0 seconds, for first retry assuming fs object was closed and open will fix it.
+ Thread.sleep(1000 * retryCount * retryCount);
+ } catch (InterruptedException e1) {
+ LOG.warn("Got interrupted in retry sleep.", e1);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java
deleted file mode 100644
index 1c4296c..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.hooks;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-public class ProtoMessageReader<T extends MessageLite> implements Closeable {
- private final Path filePath;
- private final SequenceFile.Reader reader;
- private final ProtoMessageWritable<T> writable;
-
- ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
- this.filePath = filePath;
- this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath));
- this.writable = new ProtoMessageWritable<>(parser);
- }
-
- public Path getFilePath() {
- return filePath;
- }
-
- public void setOffset(long offset) throws IOException {
- reader.seek(offset);
- }
-
- public long getOffset() throws IOException {
- return reader.getPosition();
- }
-
- public T readEvent() throws IOException {
- if (!reader.next(NullWritable.get(), writable)) {
- return null;
- }
- return writable.getMessage();
- }
-
- @Override
- public void close() throws IOException {
- reader.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java
deleted file mode 100644
index 61d8449..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.hooks;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.io.Writable;
-
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-public class ProtoMessageWritable<T extends MessageLite> implements Writable {
- private T message;
- private final Parser<T> parser;
- private DataOutputStream dos;
- private CodedOutputStream cos;
- private DataInputStream din;
- private CodedInputStream cin;
-
- ProtoMessageWritable(Parser<T> parser) {
- this.parser = parser;
- }
-
- public T getMessage() {
- return message;
- }
-
- public void setMessage(T message) {
- this.message = message;
- }
-
- private static class DataOutputStream extends OutputStream {
- DataOutput out;
- @Override
- public void write(int b) throws IOException {
- out.write(b);
- }
-
- @Override
- public void write(byte b[], int off, int len) throws IOException {
- out.write(b, off, len);
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- if (dos == null) {
- dos = new DataOutputStream();
- cos = CodedOutputStream.newInstance(dos);
- }
- dos.out = out;
- cos.writeMessageNoTag(message);
- cos.flush();
- }
-
- private static class DataInputStream extends InputStream {
- DataInput in;
- @Override
- public int read() throws IOException {
- try {
- return in.readUnsignedByte();
- } catch (EOFException e) {
- return -1;
- }
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- if (din == null) {
- din = new DataInputStream();
- cin = CodedInputStream.newInstance(din);
- cin.setSizeLimit(Integer.MAX_VALUE);
- }
- din.in = in;
- message = cin.readMessage(parser, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java
deleted file mode 100644
index ed8de93..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.hooks;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-public class ProtoMessageWriter<T extends MessageLite> implements Closeable {
- private final Path filePath;
- private final SequenceFile.Writer writer;
- private final ProtoMessageWritable<T> writable;
-
- ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
- this.filePath = filePath;
- this.writer = SequenceFile.createWriter(
- conf,
- SequenceFile.Writer.file(filePath),
- SequenceFile.Writer.keyClass(NullWritable.class),
- SequenceFile.Writer.valueClass(ProtoMessageWritable.class),
- SequenceFile.Writer.appendIfExists(true),
- SequenceFile.Writer.compression(CompressionType.RECORD));
- this.writable = new ProtoMessageWritable<>(parser);
- }
-
- public Path getPath() {
- return filePath;
- }
-
- public long getOffset() throws IOException {
- return writer.getLength();
- }
-
- public void writeProto(T message) throws IOException {
- writable.setMessage(message);
- writer.append(NullWritable.get(), writable);
- }
-
- public void hflush() throws IOException {
- writer.hflush();
- }
-
- @Override
- public void close() throws IOException {
- writer.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
new file mode 100644
index 0000000..d6a5121
--- /dev/null
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+/**
+ * Class to create proto reader and writer for a date partitioned directory structure.
+ *
+ * @param <T> The proto message type.
+ */
+public class DatePartitionedLogger<T extends MessageLite> {
+ private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class.getName());
+ // Everyone has permission to write, but with sticky set so that delete is restricted.
+ // This is required, since the path is same for all users and everyone writes into it.
+ private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777);
+
+ private final Parser<T> parser;
+ private final Path basePath;
+ private final Configuration conf;
+ private final Clock clock;
+
+ public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock)
+ throws IOException {
+ this.conf = conf;
+ this.clock = clock;
+ this.parser = parser;
+ createDirIfNotExists(baseDir);
+ this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir);
+ }
+
+ private void createDirIfNotExists(Path path) throws IOException {
+ FileSystem fileSystem = path.getFileSystem(conf);
+ try {
+ if (!fileSystem.exists(path)) {
+ fileSystem.mkdirs(path);
+ fileSystem.setPermission(path, DIR_PERMISSION);
+ }
+ } catch (IOException e) {
+ // Ignore this exception, if there is a problem it'll fail when trying to read or write.
+ LOG.warn("Error while trying to set permission: ", e);
+ }
+ }
+
+ /**
+ * Creates a writer for the given fileName, with date as today.
+ */
+ public ProtoMessageWriter<T> getWriter(String fileName) throws IOException {
+ Path filePath = getPathForDate(getNow().toLocalDate(), fileName);
+ return new ProtoMessageWriter<>(conf, filePath, parser);
+ }
+
+ /**
+ * Creates a reader for the given filePath, no validation is done.
+ */
+ public ProtoMessageReader<T> getReader(Path filePath) throws IOException {
+ return new ProtoMessageReader<>(conf, filePath, parser);
+ }
+
+ /**
+ * Create a path for the given date and fileName. This can be used to create a reader.
+ */
+ public Path getPathForDate(LocalDate date, String fileName) throws IOException {
+ Path path = new Path(basePath, getDirForDate(date));
+ createDirIfNotExists(path);
+ return new Path(path, fileName);
+ }
+
+ /**
+ * Extract the date from the directory name, this should be a directory created by this class.
+ */
+ public LocalDate getDateFromDir(String dirName) {
+ if (!dirName.startsWith("date=")) {
+ throw new IllegalArgumentException("Invalid directory: "+ dirName);
+ }
+ return LocalDate.parse(dirName.substring(5), DateTimeFormatter.ISO_LOCAL_DATE);
+ }
+
+ /**
+ * Returns the directory name for a given date.
+ */
+ public String getDirForDate(LocalDate date) {
+ return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date);
+ }
+
+ /**
+ * Find next available directory, after the given directory.
+ */
+ public String getNextDirectory(String currentDir) throws IOException {
+ // Fast check, if the next day directory exists return it.
+ String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1));
+ FileSystem fileSystem = basePath.getFileSystem(conf);
+ if (fileSystem.exists(new Path(basePath, nextDate))) {
+ return nextDate;
+ }
+ // Have to scan the directory to find min date greater than currentDir.
+ String dirName = null;
+ for (FileStatus status : fileSystem.listStatus(basePath)) {
+ String name = status.getPath().getName();
+ // String comparison is good enough, since its of form date=yyyy-MM-dd
+ if (name.compareTo(currentDir) > 0 && (dirName == null || name.compareTo(dirName) < 0)) {
+ dirName = name;
+ }
+ }
+ return dirName;
+ }
+
+ /**
+ * Returns new or changed files in the given directory. The offsets are used to find
+ * changed files.
+ */
+ public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets)
+ throws IOException {
+ Path dirPath = new Path(basePath, subDir);
+ FileSystem fileSystem = basePath.getFileSystem(conf);
+ List<Path> newFiles = new ArrayList<>();
+ if (!fileSystem.exists(dirPath)) {
+ return newFiles;
+ }
+ for (FileStatus status : fileSystem.listStatus(dirPath)) {
+ String fileName = status.getPath().getName();
+ Long offset = currentOffsets.get(fileName);
+ // If the offset was never added or offset < fileSize.
+ if (offset == null || offset < status.getLen()) {
+ newFiles.add(new Path(dirPath, fileName));
+ }
+ }
+ return newFiles;
+ }
+
+ /**
+ * Returns the current time, using the underlying clock in UTC time.
+ */
+ public LocalDateTime getNow() {
+ // Use UTC date to ensure reader date is same on all timezones.
+ return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC);
+ }
+
+ public Configuration getConfig() {
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
new file mode 100644
index 0000000..5a3c63a
--- /dev/null
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public class ProtoMessageReader<T extends MessageLite> implements Closeable {
+ private final Path filePath;
+ private final SequenceFile.Reader reader;
+ private final ProtoMessageWritable<T> writable;
+
+ ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
+ this.filePath = filePath;
+ this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath));
+ this.writable = new ProtoMessageWritable<>(parser);
+ }
+
+ public Path getFilePath() {
+ return filePath;
+ }
+
+ public void setOffset(long offset) throws IOException {
+ reader.seek(offset);
+ }
+
+ public long getOffset() throws IOException {
+ return reader.getPosition();
+ }
+
+ public T readEvent() throws IOException {
+ if (!reader.next(NullWritable.get(), writable)) {
+ return null;
+ }
+ return writable.getMessage();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
new file mode 100644
index 0000000..7a08e20
--- /dev/null
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.Writable;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public class ProtoMessageWritable<T extends MessageLite> implements Writable {
+ private T message;
+ private final Parser<T> parser;
+ private DataOutputStream dos;
+ private CodedOutputStream cos;
+ private DataInputStream din;
+ private CodedInputStream cin;
+
+ ProtoMessageWritable(Parser<T> parser) {
+ this.parser = parser;
+ }
+
+ public T getMessage() {
+ return message;
+ }
+
+ public void setMessage(T message) {
+ this.message = message;
+ }
+
+ private static class DataOutputStream extends OutputStream {
+ DataOutput out;
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (dos == null) {
+ dos = new DataOutputStream();
+ cos = CodedOutputStream.newInstance(dos);
+ }
+ dos.out = out;
+ cos.writeMessageNoTag(message);
+ cos.flush();
+ }
+
+ private static class DataInputStream extends InputStream {
+ DataInput in;
+ @Override
+ public int read() throws IOException {
+ try {
+ return in.readUnsignedByte();
+ } catch (EOFException e) {
+ return -1;
+ }
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ if (din == null) {
+ din = new DataInputStream();
+ cin = CodedInputStream.newInstance(din);
+ cin.setSizeLimit(Integer.MAX_VALUE);
+ }
+ din.in = in;
+ message = cin.readMessage(parser, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
new file mode 100644
index 0000000..c746bb6
--- /dev/null
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public class ProtoMessageWriter<T extends MessageLite> implements Closeable {
+ private final Path filePath;
+ private final SequenceFile.Writer writer;
+ private final ProtoMessageWritable<T> writable;
+
+ ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
+ this.filePath = filePath;
+ this.writer = SequenceFile.createWriter(
+ conf,
+ SequenceFile.Writer.file(filePath),
+ SequenceFile.Writer.keyClass(NullWritable.class),
+ SequenceFile.Writer.valueClass(ProtoMessageWritable.class),
+ SequenceFile.Writer.appendIfExists(true),
+ SequenceFile.Writer.compression(CompressionType.RECORD));
+ this.writable = new ProtoMessageWritable<>(parser);
+ }
+
+ public Path getPath() {
+ return filePath;
+ }
+
+ public long getOffset() throws IOException {
+ return writer.getLength();
+ }
+
+ public void writeProto(T message) throws IOException {
+ writable.setMessage(message);
+ writer.append(NullWritable.get(), writable);
+ }
+
+ public void hflush() throws IOException {
+ writer.hflush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java
new file mode 100644
index 0000000..23ed460
--- /dev/null
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Logger code copied from tez codebase, this should be removed when we swtich
+ * to 0.9.2 tez version and we should depend on the tez libraries for this.
+ */
+package org.apache.tez.dag.history.logging.proto;
http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
index 5e117fe..98b73e8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger;
+import org.apache.tez.dag.history.logging.proto.ProtoMessageReader;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;