You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2018/06/14 11:11:01 UTC

hive git commit: HIVE-19646: Filesystem closed error in HiveProtoLoggingHook (Harish JP, reviewd by Anishek Agarwal)

Repository: hive
Updated Branches:
  refs/heads/branch-3 abfbd1af3 -> 55bc28540


HIVE-19646: Filesystem closed error in HiveProtoLoggingHook (Harish JP, reviewd by Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/55bc2854
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/55bc2854
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/55bc2854

Branch: refs/heads/branch-3
Commit: 55bc2854096f7666244a23e1d769a90fcda0863d
Parents: abfbd1a
Author: Anishek Agarwal <an...@gmail.com>
Authored: Thu Jun 14 16:40:48 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Thu Jun 14 16:40:48 2018 +0530

----------------------------------------------------------------------
 .../hive/ql/hooks/DatePartitionedLogger.java    | 167 -----------------
 .../hive/ql/hooks/HiveProtoLoggingHook.java     |  32 +++-
 .../hive/ql/hooks/ProtoMessageReader.java       |  66 -------
 .../hive/ql/hooks/ProtoMessageWritable.java     | 101 -----------
 .../hive/ql/hooks/ProtoMessageWriter.java       |  71 --------
 .../logging/proto/DatePartitionedLogger.java    | 177 +++++++++++++++++++
 .../logging/proto/ProtoMessageReader.java       |  66 +++++++
 .../logging/proto/ProtoMessageWritable.java     | 101 +++++++++++
 .../logging/proto/ProtoMessageWriter.java       |  71 ++++++++
 .../dag/history/logging/proto/package-info.java |  23 +++
 .../hive/ql/hooks/TestHiveProtoLoggingHook.java |   2 +
 11 files changed, 465 insertions(+), 412 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java
deleted file mode 100644
index c9d1b93..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.hooks;
-
-import java.io.IOException;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.yarn.util.Clock;
-
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-/**
- * Class to create proto reader and writer for a date partitioned directory structure.
- *
- * @param <T> The proto message type.
- */
-public class DatePartitionedLogger<T extends MessageLite> {
-  // Everyone has permission to write, but with sticky set so that delete is restricted.
-  // This is required, since the path is same for all users and everyone writes into it.
-  private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777);
-
-  private final Parser<T> parser;
-  private final Path basePath;
-  private final Configuration conf;
-  private final Clock clock;
-  private final FileSystem fileSystem;
-
-  public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock)
-      throws IOException {
-    this.conf = conf;
-    this.clock = clock;
-    this.parser = parser;
-    this.fileSystem = baseDir.getFileSystem(conf);
-    if (!fileSystem.exists(baseDir)) {
-      fileSystem.mkdirs(baseDir);
-      fileSystem.setPermission(baseDir, DIR_PERMISSION);
-    }
-    this.basePath = fileSystem.resolvePath(baseDir);
-  }
-
-  /**
-   * Creates a writer for the given fileName, with date as today.
-   */
-  public ProtoMessageWriter<T> getWriter(String fileName) throws IOException {
-    Path filePath = getPathForDate(getNow().toLocalDate(), fileName);
-    return new ProtoMessageWriter<>(conf, filePath, parser);
-  }
-
-  /**
-   * Creates a reader for the given filePath, no validation is done.
-   */
-  public ProtoMessageReader<T> getReader(Path filePath) throws IOException {
-    return new ProtoMessageReader<>(conf, filePath, parser);
-  }
-
-  /**
-   * Create a path for the given date and fileName. This can be used to create a reader.
-   */
-  public Path getPathForDate(LocalDate date, String fileName) throws IOException {
-    Path path = new Path(basePath, getDirForDate(date));
-    if (!fileSystem.exists(path)) {
-      fileSystem.mkdirs(path);
-      fileSystem.setPermission(path, DIR_PERMISSION);
-    }
-    return new Path(path, fileName);
-  }
-
-  /**
-   * Extract the date from the directory name, this should be a directory created by this class.
-   */
-  public LocalDate getDateFromDir(String dirName) {
-    if (!dirName.startsWith("date=")) {
-      throw new IllegalArgumentException("Invalid directory: "+ dirName);
-    }
-    return LocalDate.parse(dirName.substring(5), DateTimeFormatter.ISO_LOCAL_DATE);
-  }
-
-  /**
-   * Returns the directory name for a given date.
-   */
-  public String getDirForDate(LocalDate date) {
-    return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date);
-  }
-
-  /**
-   * Find next available directory, after the given directory.
-   */
-  public String getNextDirectory(String currentDir) throws IOException {
-    // Fast check, if the next day directory exists return it.
-    String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1));
-    if (fileSystem.exists(new Path(basePath, nextDate))) {
-      return nextDate;
-    }
-    // Have to scan the directory to find min date greater than currentDir.
-    String dirName = null;
-    for (FileStatus status : fileSystem.listStatus(basePath)) {
-      String name = status.getPath().getName();
-      // String comparison is good enough, since its of form date=yyyy-MM-dd
-      if (name.compareTo(currentDir) > 0 && (dirName == null || name.compareTo(dirName) < 0)) {
-        dirName = name;
-      }
-    }
-    return dirName;
-  }
-
-  /**
-   * Returns new or changed files in the given directory. The offsets are used to find
-   * changed files.
-   */
-  public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets)
-      throws IOException {
-    Path dirPath = new Path(basePath, subDir);
-    List<Path> newFiles = new ArrayList<>();
-    if (!fileSystem.exists(dirPath)) {
-      return newFiles;
-    }
-    for (FileStatus status : fileSystem.listStatus(dirPath)) {
-      String fileName = status.getPath().getName();
-      Long offset = currentOffsets.get(fileName);
-      // If the offset was never added or offset < fileSize.
-      if (offset == null || offset < status.getLen()) {
-        newFiles.add(new Path(dirPath, fileName));
-      }
-    }
-    return newFiles;
-  }
-
-  /**
-   * Returns the current time, using the underlying clock in UTC time.
-   */
-  public LocalDateTime getNow() {
-    // Use UTC date to ensure reader date is same on all timezones.
-    return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC);
-  }
-
-  public Configuration getConfig() {
-    return conf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index 1ae8194..eef6ac9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -118,6 +118,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger;
+import org.apache.tez.dag.history.logging.proto.ProtoMessageWriter;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -279,14 +281,30 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       }
     }
 
+    private static final int MAX_RETRIES = 2;
     private void writeEvent(HiveHookEventProto event) {
-      try (ProtoMessageWriter<HiveHookEventProto> writer = logger.getWriter(logFileName)) {
-        writer.writeProto(event);
-        // This does not work hence, opening and closing file for every event.
-        // writer.hflush();
-      } catch (IOException e) {
-        LOG.error("Error writing proto message for query {}, eventType: {}: ",
-            event.getHiveQueryId(), event.getEventType(), e);
+      for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) {
+        try (ProtoMessageWriter<HiveHookEventProto> writer = logger.getWriter(logFileName)) {
+          writer.writeProto(event);
+          // This does not work hence, opening and closing file for every event.
+          // writer.hflush();
+          return;
+        } catch (IOException e) {
+          if (retryCount < MAX_RETRIES) {
+            LOG.warn("Error writing proto message for query {}, eventType: {}, retryCount: {}," +
+                " error: {} ", event.getHiveQueryId(), event.getEventType(), retryCount,
+                e.getMessage());
+          } else {
+            LOG.error("Error writing proto message for query {}, eventType: {}: ",
+                event.getHiveQueryId(), event.getEventType(), e);
+          }
+          try {
+            // 0 seconds, for first retry assuming fs object was closed and open will fix it.
+            Thread.sleep(1000 * retryCount * retryCount);
+          } catch (InterruptedException e1) {
+            LOG.warn("Got interrupted in retry sleep.", e1);
+          }
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java
deleted file mode 100644
index 1c4296c..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.hooks;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-public class ProtoMessageReader<T extends MessageLite> implements Closeable {
-  private final Path filePath;
-  private final SequenceFile.Reader reader;
-  private final ProtoMessageWritable<T> writable;
-
-  ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
-    this.filePath = filePath;
-    this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath));
-    this.writable = new ProtoMessageWritable<>(parser);
-  }
-
-  public Path getFilePath() {
-    return filePath;
-  }
-
-  public void setOffset(long offset) throws IOException {
-    reader.seek(offset);
-  }
-
-  public long getOffset() throws IOException {
-    return reader.getPosition();
-  }
-
-  public T readEvent() throws IOException {
-    if (!reader.next(NullWritable.get(), writable)) {
-      return null;
-    }
-    return writable.getMessage();
-  }
-
-  @Override
-  public void close() throws IOException {
-    reader.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java
deleted file mode 100644
index 61d8449..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.hooks;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.io.Writable;
-
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-public class ProtoMessageWritable<T extends MessageLite> implements Writable {
-  private T message;
-  private final Parser<T> parser;
-  private DataOutputStream dos;
-  private CodedOutputStream cos;
-  private DataInputStream din;
-  private CodedInputStream cin;
-
-  ProtoMessageWritable(Parser<T> parser) {
-    this.parser = parser;
-  }
-
-  public T getMessage() {
-    return message;
-  }
-
-  public void setMessage(T message) {
-    this.message = message;
-  }
-
-  private static class DataOutputStream extends OutputStream {
-    DataOutput out;
-    @Override
-    public void write(int b) throws IOException {
-      out.write(b);
-    }
-
-    @Override
-    public void write(byte b[], int off, int len) throws IOException {
-      out.write(b, off, len);
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    if (dos == null) {
-      dos = new DataOutputStream();
-      cos = CodedOutputStream.newInstance(dos);
-    }
-    dos.out = out;
-    cos.writeMessageNoTag(message);
-    cos.flush();
-  }
-
-  private static class DataInputStream extends InputStream {
-    DataInput in;
-    @Override
-    public int read() throws IOException {
-      try {
-        return in.readUnsignedByte();
-      } catch (EOFException e) {
-        return -1;
-      }
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    if (din == null) {
-      din = new DataInputStream();
-      cin = CodedInputStream.newInstance(din);
-      cin.setSizeLimit(Integer.MAX_VALUE);
-    }
-    din.in = in;
-    message = cin.readMessage(parser, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java
deleted file mode 100644
index ed8de93..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.hooks;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-public class ProtoMessageWriter<T extends MessageLite> implements Closeable {
-  private final Path filePath;
-  private final SequenceFile.Writer writer;
-  private final ProtoMessageWritable<T> writable;
-
-  ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
-    this.filePath = filePath;
-    this.writer = SequenceFile.createWriter(
-        conf,
-        SequenceFile.Writer.file(filePath),
-        SequenceFile.Writer.keyClass(NullWritable.class),
-        SequenceFile.Writer.valueClass(ProtoMessageWritable.class),
-        SequenceFile.Writer.appendIfExists(true),
-        SequenceFile.Writer.compression(CompressionType.RECORD));
-    this.writable = new ProtoMessageWritable<>(parser);
-  }
-
-  public Path getPath() {
-    return filePath;
-  }
-
-  public long getOffset() throws IOException {
-    return writer.getLength();
-  }
-
-  public void writeProto(T message) throws IOException {
-    writable.setMessage(message);
-    writer.append(NullWritable.get(), writable);
-  }
-
-  public void hflush() throws IOException {
-    writer.hflush();
-  }
-
-  @Override
-  public void close() throws IOException {
-    writer.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
new file mode 100644
index 0000000..d6a5121
--- /dev/null
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+/**
+ * Class to create proto reader and writer for a date partitioned directory structure.
+ *
+ * @param <T> The proto message type.
+ */
+public class DatePartitionedLogger<T extends MessageLite> {
+  private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class.getName());
+  // Everyone has permission to write, but with sticky set so that delete is restricted.
+  // This is required, since the path is same for all users and everyone writes into it.
+  private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777);
+
+  private final Parser<T> parser;
+  private final Path basePath;
+  private final Configuration conf;
+  private final Clock clock;
+
+  public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock)
+      throws IOException {
+    this.conf = conf;
+    this.clock = clock;
+    this.parser = parser;
+    createDirIfNotExists(baseDir);
+    this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir);
+  }
+
+  private void createDirIfNotExists(Path path) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(conf);
+    try {
+      if (!fileSystem.exists(path)) {
+        fileSystem.mkdirs(path);
+        fileSystem.setPermission(path, DIR_PERMISSION);
+      }
+    } catch (IOException e) {
+      // Ignore this exception, if there is a problem it'll fail when trying to read or write.
+      LOG.warn("Error while trying to set permission: ", e);
+    }
+  }
+
+  /**
+   * Creates a writer for the given fileName, with date as today.
+   */
+  public ProtoMessageWriter<T> getWriter(String fileName) throws IOException {
+    Path filePath = getPathForDate(getNow().toLocalDate(), fileName);
+    return new ProtoMessageWriter<>(conf, filePath, parser);
+  }
+
+  /**
+   * Creates a reader for the given filePath, no validation is done.
+   */
+  public ProtoMessageReader<T> getReader(Path filePath) throws IOException {
+    return new ProtoMessageReader<>(conf, filePath, parser);
+  }
+
+  /**
+   * Create a path for the given date and fileName. This can be used to create a reader.
+   */
+  public Path getPathForDate(LocalDate date, String fileName) throws IOException {
+    Path path = new Path(basePath, getDirForDate(date));
+    createDirIfNotExists(path);
+    return new Path(path, fileName);
+  }
+
+  /**
+   * Extract the date from the directory name, this should be a directory created by this class.
+   */
+  public LocalDate getDateFromDir(String dirName) {
+    if (!dirName.startsWith("date=")) {
+      throw new IllegalArgumentException("Invalid directory: "+ dirName);
+    }
+    return LocalDate.parse(dirName.substring(5), DateTimeFormatter.ISO_LOCAL_DATE);
+  }
+
+  /**
+   * Returns the directory name for a given date.
+   */
+  public String getDirForDate(LocalDate date) {
+    return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date);
+  }
+
+  /**
+   * Find next available directory, after the given directory.
+   */
+  public String getNextDirectory(String currentDir) throws IOException {
+    // Fast check, if the next day directory exists return it.
+    String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1));
+    FileSystem fileSystem = basePath.getFileSystem(conf);
+    if (fileSystem.exists(new Path(basePath, nextDate))) {
+      return nextDate;
+    }
+    // Have to scan the directory to find min date greater than currentDir.
+    String dirName = null;
+    for (FileStatus status : fileSystem.listStatus(basePath)) {
+      String name = status.getPath().getName();
+      // String comparison is good enough, since its of form date=yyyy-MM-dd
+      if (name.compareTo(currentDir) > 0 && (dirName == null || name.compareTo(dirName) < 0)) {
+        dirName = name;
+      }
+    }
+    return dirName;
+  }
+
+  /**
+   * Returns new or changed files in the given directory. The offsets are used to find
+   * changed files.
+   */
+  public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets)
+      throws IOException {
+    Path dirPath = new Path(basePath, subDir);
+    FileSystem fileSystem = basePath.getFileSystem(conf);
+    List<Path> newFiles = new ArrayList<>();
+    if (!fileSystem.exists(dirPath)) {
+      return newFiles;
+    }
+    for (FileStatus status : fileSystem.listStatus(dirPath)) {
+      String fileName = status.getPath().getName();
+      Long offset = currentOffsets.get(fileName);
+      // If the offset was never added or offset < fileSize.
+      if (offset == null || offset < status.getLen()) {
+        newFiles.add(new Path(dirPath, fileName));
+      }
+    }
+    return newFiles;
+  }
+
+  /**
+   * Returns the current time, using the underlying clock in UTC time.
+   */
+  public LocalDateTime getNow() {
+    // Use UTC date to ensure reader date is same on all timezones.
+    return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC);
+  }
+
+  public Configuration getConfig() {
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
new file mode 100644
index 0000000..5a3c63a
--- /dev/null
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public class ProtoMessageReader<T extends MessageLite> implements Closeable {
+  private final Path filePath;
+  private final SequenceFile.Reader reader;
+  private final ProtoMessageWritable<T> writable;
+
+  ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
+    this.filePath = filePath;
+    this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath));
+    this.writable = new ProtoMessageWritable<>(parser);
+  }
+
+  public Path getFilePath() {
+    return filePath;
+  }
+
+  public void setOffset(long offset) throws IOException {
+    reader.seek(offset);
+  }
+
+  public long getOffset() throws IOException {
+    return reader.getPosition();
+  }
+
+  public T readEvent() throws IOException {
+    if (!reader.next(NullWritable.get(), writable)) {
+      return null;
+    }
+    return writable.getMessage();
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
new file mode 100644
index 0000000..7a08e20
--- /dev/null
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.Writable;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public class ProtoMessageWritable<T extends MessageLite> implements Writable {
+  private T message;
+  private final Parser<T> parser;
+  private DataOutputStream dos;
+  private CodedOutputStream cos;
+  private DataInputStream din;
+  private CodedInputStream cin;
+
+  ProtoMessageWritable(Parser<T> parser) {
+    this.parser = parser;
+  }
+
+  public T getMessage() {
+    return message;
+  }
+
+  public void setMessage(T message) {
+    this.message = message;
+  }
+
+  private static class DataOutputStream extends OutputStream {
+    DataOutput out;
+    @Override
+    public void write(int b) throws IOException {
+      out.write(b);
+    }
+
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+      out.write(b, off, len);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (dos == null) {
+      dos = new DataOutputStream();
+      cos = CodedOutputStream.newInstance(dos);
+    }
+    dos.out = out;
+    cos.writeMessageNoTag(message);
+    cos.flush();
+  }
+
+  private static class DataInputStream extends InputStream {
+    DataInput in;
+    @Override
+    public int read() throws IOException {
+      try {
+        return in.readUnsignedByte();
+      } catch (EOFException e) {
+        return -1;
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    if (din == null) {
+      din = new DataInputStream();
+      cin = CodedInputStream.newInstance(din);
+      cin.setSizeLimit(Integer.MAX_VALUE);
+    }
+    din.in = in;
+    message = cin.readMessage(parser, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
new file mode 100644
index 0000000..c746bb6
--- /dev/null
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public class ProtoMessageWriter<T extends MessageLite> implements Closeable {
+  private final Path filePath;
+  private final SequenceFile.Writer writer;
+  private final ProtoMessageWritable<T> writable;
+
+  ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
+    this.filePath = filePath;
+    this.writer = SequenceFile.createWriter(
+        conf,
+        SequenceFile.Writer.file(filePath),
+        SequenceFile.Writer.keyClass(NullWritable.class),
+        SequenceFile.Writer.valueClass(ProtoMessageWritable.class),
+        SequenceFile.Writer.appendIfExists(true),
+        SequenceFile.Writer.compression(CompressionType.RECORD));
+    this.writable = new ProtoMessageWritable<>(parser);
+  }
+
+  public Path getPath() {
+    return filePath;
+  }
+
+  public long getOffset() throws IOException {
+    return writer.getLength();
+  }
+
+  public void writeProto(T message) throws IOException {
+    writable.setMessage(message);
+    writer.append(NullWritable.get(), writable);
+  }
+
+  public void hflush() throws IOException {
+    writer.hflush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java
new file mode 100644
index 0000000..23ed460
--- /dev/null
+++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java
@@ -0,0 +1,23 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * Logger code copied from tez codebase, this should be removed when we swtich
+ * to 0.9.2 tez version and we should depend on the tez libraries for this.
+ */
+package org.apache.tez.dag.history.logging.proto;

http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
index 5e117fe..98b73e8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger;
+import org.apache.tez.dag.history.logging.proto.ProtoMessageReader;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;