You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/04/28 17:48:25 UTC
[2/4] hive git commit: HIVE-19288 : Implement protobuf logging hive
hook. (Harish JP via Ashutosh Chauhan)
http://git-wip-us.apache.org/repos/asf/hive/blob/e08cc6e6/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java
new file mode 100644
index 0000000..c9d1b93
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.util.Clock;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+/**
+ * Class to create proto reader and writer for a date partitioned directory structure.
+ *
+ * @param <T> The proto message type.
+ */
+public class DatePartitionedLogger<T extends MessageLite> {
+ // Everyone has permission to write, but with sticky set so that delete is restricted.
+ // This is required, since the path is same for all users and everyone writes into it.
+ private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777);
+
+ private final Parser<T> parser;
+ private final Path basePath;
+ private final Configuration conf;
+ private final Clock clock;
+ private final FileSystem fileSystem;
+
+ public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock)
+ throws IOException {
+ this.conf = conf;
+ this.clock = clock;
+ this.parser = parser;
+ this.fileSystem = baseDir.getFileSystem(conf);
+ if (!fileSystem.exists(baseDir)) {
+ fileSystem.mkdirs(baseDir);
+ fileSystem.setPermission(baseDir, DIR_PERMISSION);
+ }
+ this.basePath = fileSystem.resolvePath(baseDir);
+ }
+
+ /**
+ * Creates a writer for the given fileName, with date as today.
+ */
+ public ProtoMessageWriter<T> getWriter(String fileName) throws IOException {
+ Path filePath = getPathForDate(getNow().toLocalDate(), fileName);
+ return new ProtoMessageWriter<>(conf, filePath, parser);
+ }
+
+ /**
+ * Creates a reader for the given filePath, no validation is done.
+ */
+ public ProtoMessageReader<T> getReader(Path filePath) throws IOException {
+ return new ProtoMessageReader<>(conf, filePath, parser);
+ }
+
+ /**
+ * Create a path for the given date and fileName. This can be used to create a reader.
+ */
+ public Path getPathForDate(LocalDate date, String fileName) throws IOException {
+ Path path = new Path(basePath, getDirForDate(date));
+ if (!fileSystem.exists(path)) {
+ fileSystem.mkdirs(path);
+ fileSystem.setPermission(path, DIR_PERMISSION);
+ }
+ return new Path(path, fileName);
+ }
+
+ /**
+ * Extract the date from the directory name, this should be a directory created by this class.
+ */
+ public LocalDate getDateFromDir(String dirName) {
+ if (!dirName.startsWith("date=")) {
+ throw new IllegalArgumentException("Invalid directory: "+ dirName);
+ }
+ return LocalDate.parse(dirName.substring(5), DateTimeFormatter.ISO_LOCAL_DATE);
+ }
+
+ /**
+ * Returns the directory name for a given date.
+ */
+ public String getDirForDate(LocalDate date) {
+ return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date);
+ }
+
+ /**
+ * Find next available directory, after the given directory.
+ */
+ public String getNextDirectory(String currentDir) throws IOException {
+ // Fast check, if the next day directory exists return it.
+ String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1));
+ if (fileSystem.exists(new Path(basePath, nextDate))) {
+ return nextDate;
+ }
+ // Have to scan the directory to find min date greater than currentDir.
+ String dirName = null;
+ for (FileStatus status : fileSystem.listStatus(basePath)) {
+ String name = status.getPath().getName();
+ // String comparison is good enough, since its of form date=yyyy-MM-dd
+ if (name.compareTo(currentDir) > 0 && (dirName == null || name.compareTo(dirName) < 0)) {
+ dirName = name;
+ }
+ }
+ return dirName;
+ }
+
+ /**
+ * Returns new or changed files in the given directory. The offsets are used to find
+ * changed files.
+ */
+ public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets)
+ throws IOException {
+ Path dirPath = new Path(basePath, subDir);
+ List<Path> newFiles = new ArrayList<>();
+ if (!fileSystem.exists(dirPath)) {
+ return newFiles;
+ }
+ for (FileStatus status : fileSystem.listStatus(dirPath)) {
+ String fileName = status.getPath().getName();
+ Long offset = currentOffsets.get(fileName);
+ // If the offset was never added or offset < fileSize.
+ if (offset == null || offset < status.getLen()) {
+ newFiles.add(new Path(dirPath, fileName));
+ }
+ }
+ return newFiles;
+ }
+
+ /**
+ * Returns the current time, using the underlying clock in UTC time.
+ */
+ public LocalDateTime getNow() {
+ // Use UTC date to ensure reader date is same on all timezones.
+ return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC);
+ }
+
+ public Configuration getConfig() {
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e08cc6e6/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
new file mode 100644
index 0000000..1ae8194
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERDATABASE;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERDATABASE_OWNER;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERPARTITION_BUCKETNUM;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERPARTITION_FILEFORMAT;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERPARTITION_LOCATION;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERPARTITION_MERGEFILES;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERPARTITION_SERDEPROPERTIES;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERPARTITION_SERIALIZER;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_ADDCOLS;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_ADDCONSTRAINT;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_ADDPARTS;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_ARCHIVE;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_BUCKETNUM;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_CLUSTER_SORT;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_COMPACT;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_DROPCONSTRAINT;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_DROPPARTS;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_EXCHANGEPARTITION;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_FILEFORMAT;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_LOCATION;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_MERGEFILES;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_PARTCOLTYPE;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_PROPERTIES;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_RENAME;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_RENAMECOL;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_RENAMEPART;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_REPLACECOLS;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_SERDEPROPERTIES;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_SERIALIZER;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_SKEWED;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_TOUCH;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_UNARCHIVE;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_UPDATEPARTSTATS;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_UPDATETABLESTATS;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTBLPART_SKEWED_LOCATION;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERVIEW_AS;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERVIEW_PROPERTIES;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERVIEW_RENAME;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.ANALYZE_TABLE;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.CACHE_METADATA;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.CREATEDATABASE;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.CREATEFUNCTION;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.CREATEMACRO;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.CREATEROLE;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.CREATETABLE;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.CREATETABLE_AS_SELECT;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.CREATEVIEW;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.DROPDATABASE;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.DROPFUNCTION;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.DROPMACRO;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.DROPROLE;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.DROPTABLE;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.DROPVIEW;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.DROPVIEW_PROPERTIES;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.EXPORT;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.IMPORT;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.KILL_QUERY;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.LOAD;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.LOCKTABLE;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.MSCK;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.QUERY;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.RELOADFUNCTION;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.TRUNCATETABLE;
+import static org.apache.hadoop.hive.ql.plan.HiveOperation.UNLOCKTABLE;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.ExplainTask;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.HiveHookEventProto;
+import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry;
+import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
+import org.apache.hadoop.hive.ql.plan.ExplainWork;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hive.common.util.ShutdownHookManager;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Log events from hive hook using protobuf serialized format, partitioned by date.
+ */
+public class HiveProtoLoggingHook implements ExecuteWithHookContext {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveProtoLoggingHook.class.getName());
+ private static final Set<String> includedOperationSet;
+ private static final int VERSION = 1;
+
+ static {
+ // List of operation for which we log.
+ includedOperationSet = Arrays.stream(new HiveOperation[] { LOAD, EXPORT, IMPORT,
+ CREATEDATABASE, DROPDATABASE, DROPTABLE, MSCK, ALTERTABLE_ADDCOLS, ALTERTABLE_REPLACECOLS,
+ ALTERTABLE_RENAMECOL, ALTERTABLE_RENAMEPART, ALTERTABLE_UPDATEPARTSTATS,
+ ALTERTABLE_UPDATETABLESTATS, ALTERTABLE_RENAME, ALTERTABLE_DROPPARTS, ALTERTABLE_ADDPARTS,
+ ALTERTABLE_TOUCH, ALTERTABLE_ARCHIVE, ALTERTABLE_UNARCHIVE, ALTERTABLE_PROPERTIES,
+ ALTERTABLE_SERIALIZER, ALTERPARTITION_SERIALIZER, ALTERTABLE_SERDEPROPERTIES,
+ ALTERPARTITION_SERDEPROPERTIES, ALTERTABLE_CLUSTER_SORT, ANALYZE_TABLE, CACHE_METADATA,
+ ALTERTABLE_BUCKETNUM, ALTERPARTITION_BUCKETNUM, CREATEFUNCTION, DROPFUNCTION,
+ RELOADFUNCTION, CREATEMACRO, DROPMACRO, CREATEVIEW, DROPVIEW, ALTERVIEW_PROPERTIES,
+ DROPVIEW_PROPERTIES, LOCKTABLE, UNLOCKTABLE, CREATEROLE, DROPROLE, ALTERTABLE_FILEFORMAT,
+ ALTERPARTITION_FILEFORMAT, ALTERTABLE_LOCATION, ALTERPARTITION_LOCATION, CREATETABLE,
+ TRUNCATETABLE, CREATETABLE_AS_SELECT, QUERY, ALTERDATABASE, ALTERDATABASE_OWNER,
+ ALTERTABLE_MERGEFILES, ALTERPARTITION_MERGEFILES, ALTERTABLE_SKEWED,
+ ALTERTBLPART_SKEWED_LOCATION, ALTERTABLE_PARTCOLTYPE, ALTERTABLE_EXCHANGEPARTITION,
+ ALTERTABLE_DROPCONSTRAINT, ALTERTABLE_ADDCONSTRAINT, ALTERVIEW_RENAME, ALTERVIEW_AS,
+ ALTERTABLE_COMPACT, KILL_QUERY })
+ .map(HiveOperation::getOperationName)
+ .collect(Collectors.toSet());
+ }
+
+ public static final String HIVE_EVENTS_BASE_PATH = "hive.hook.proto.base-directory";
+ public static final String HIVE_HOOK_PROTO_QUEUE_CAPACITY = "hive.hook.proto.queue.capacity";
+ public static final int HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT = 64;
+ private static final int WAIT_TIME = 5;
+
+ public enum EventType {
+ QUERY_SUBMITTED, QUERY_COMPLETED
+ }
+
+ public enum OtherInfoType {
+ QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, SESSION_ID, THREAD_NAME, VERSION, CLIENT_IP_ADDRESS,
+ HIVE_ADDRESS, HIVE_INSTANCE_TYPE, CONF, PERF, LLAP_APP_ID
+ }
+
+ public enum ExecutionMode {
+ MR, TEZ, LLAP, SPARK, NONE
+ }
+
+ static class EventLogger {
+ private final Clock clock;
+ private final String logFileName;
+ private final DatePartitionedLogger<HiveHookEventProto> logger;
+ private final ExecutorService eventHandler;
+ private final ExecutorService logWriter;
+
+ EventLogger(HiveConf conf, Clock clock) {
+ this.clock = clock;
+ // randomUUID is slow, since its cryptographically secure, only first query will take time.
+ this.logFileName = "hive_" + UUID.randomUUID().toString();
+ String baseDir = conf.get(HIVE_EVENTS_BASE_PATH);
+ if (baseDir == null) {
+ LOG.error(HIVE_EVENTS_BASE_PATH + " is not set, logging disabled.");
+ }
+
+ DatePartitionedLogger<HiveHookEventProto> tmpLogger = null;
+ try {
+ if (baseDir != null) {
+ tmpLogger = new DatePartitionedLogger<>(HiveHookEventProto.PARSER, new Path(baseDir),
+ conf, clock);
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to intialize logger, logging disabled.", e);
+ }
+ this.logger = tmpLogger;
+ if (logger == null) {
+ eventHandler = null;
+ logWriter = null;
+ return;
+ }
+
+ int queueCapacity = conf.getInt(HIVE_HOOK_PROTO_QUEUE_CAPACITY,
+ HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT);
+
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Hive Hook Proto Event Handler %d").build();
+ eventHandler = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory);
+
+ threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Hive Hook Proto Log Writer %d").build();
+ logWriter = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory);
+ }
+
+ void shutdown() {
+ // Wait for all the events to be written off, the order of service is important
+ for (ExecutorService service : new ExecutorService[] {eventHandler, logWriter}) {
+ if (service == null) {
+ continue;
+ }
+ service.shutdown();
+ try {
+ service.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("Got interrupted exception while waiting for events to be flushed", e);
+ }
+ }
+ }
+
+ void handle(HookContext hookContext) {
+ if (logger == null) {
+ return;
+ }
+ try {
+ eventHandler.execute(() -> generateEvent(hookContext));
+ } catch (RejectedExecutionException e) {
+ LOG.warn("Handler queue full ignoring event: " + hookContext.getHookType());
+ }
+ }
+
+ private void generateEvent(HookContext hookContext) {
+ QueryPlan plan = hookContext.getQueryPlan();
+ if (plan == null) {
+ LOG.debug("Received null query plan.");
+ return;
+ }
+ if (!includedOperationSet.contains(plan.getOperationName())) {
+ LOG.debug("Not logging events of operation type : {}", plan.getOperationName());
+ return;
+ }
+ HiveHookEventProto event;
+ switch (hookContext.getHookType()) {
+ case PRE_EXEC_HOOK:
+ event = getPreHookEvent(hookContext);
+ break;
+ case POST_EXEC_HOOK:
+ event = getPostHookEvent(hookContext, true);
+ break;
+ case ON_FAILURE_HOOK:
+ event = getPostHookEvent(hookContext, false);
+ break;
+ default:
+ LOG.warn("Ignoring event of type: {}", hookContext.getHookType());
+ event = null;
+ }
+ if (event != null) {
+ try {
+ logWriter.execute(() -> writeEvent(event));
+ } catch (RejectedExecutionException e) {
+ LOG.warn("Writer queue full ignoring event {} for query {}",
+ hookContext.getHookType(), plan.getQueryId());
+ }
+ }
+ }
+
+ private void writeEvent(HiveHookEventProto event) {
+ try (ProtoMessageWriter<HiveHookEventProto> writer = logger.getWriter(logFileName)) {
+ writer.writeProto(event);
+ // This does not work hence, opening and closing file for every event.
+ // writer.hflush();
+ } catch (IOException e) {
+ LOG.error("Error writing proto message for query {}, eventType: {}: ",
+ event.getHiveQueryId(), event.getEventType(), e);
+ }
+ }
+
+ private HiveHookEventProto getPreHookEvent(HookContext hookContext) {
+ QueryPlan plan = hookContext.getQueryPlan();
+ LOG.info("Received pre-hook notification for: " + plan.getQueryId());
+
+ // Make a copy so that we do not modify hookContext conf.
+ HiveConf conf = new HiveConf(hookContext.getConf());
+ List<ExecDriver> mrTasks = Utilities.getMRTasks(plan.getRootTasks());
+ List<TezTask> tezTasks = Utilities.getTezTasks(plan.getRootTasks());
+ ExecutionMode executionMode = getExecutionMode(plan, mrTasks, tezTasks);
+
+ HiveHookEventProto.Builder builder = HiveHookEventProto.newBuilder();
+ builder.setEventType(EventType.QUERY_SUBMITTED.name());
+ builder.setTimestamp(plan.getQueryStartTime());
+ builder.setHiveQueryId(plan.getQueryId());
+ builder.setUser(getUser(hookContext));
+ builder.setRequestUser(getRequestUser(hookContext));
+ builder.setQueue(conf.get("mapreduce.job.queuename"));
+ builder.setExecutionMode(executionMode.name());
+ builder.addAllTablesRead(getTablesFromEntitySet(hookContext.getInputs()));
+ builder.addAllTablesWritten(getTablesFromEntitySet(hookContext.getOutputs()));
+ if (hookContext.getOperationId() != null) {
+ builder.setOperationId(hookContext.getOperationId());
+ }
+
+ try {
+ JSONObject queryObj = new JSONObject();
+ queryObj.put("queryText", plan.getQueryStr());
+ queryObj.put("queryPlan", getExplainPlan(plan, conf, hookContext));
+ addMapEntry(builder, OtherInfoType.QUERY, queryObj.toString());
+ } catch (Exception e) {
+ LOG.error("Unexpected exception while serializing json.", e);
+ }
+
+ addMapEntry(builder, OtherInfoType.TEZ, Boolean.toString(tezTasks.size() > 0));
+ addMapEntry(builder, OtherInfoType.MAPRED, Boolean.toString(mrTasks.size() > 0));
+ addMapEntry(builder, OtherInfoType.SESSION_ID, hookContext.getSessionId());
+ String logID = conf.getLogIdVar(hookContext.getSessionId());
+ addMapEntry(builder, OtherInfoType.INVOKER_INFO, logID);
+ addMapEntry(builder, OtherInfoType.THREAD_NAME, hookContext.getThreadId());
+ addMapEntry(builder, OtherInfoType.VERSION, Integer.toString(VERSION));
+ addMapEntry(builder, OtherInfoType.CLIENT_IP_ADDRESS, hookContext.getIpAddress());
+
+ String hiveInstanceAddress = hookContext.getHiveInstanceAddress();
+ if (hiveInstanceAddress == null) {
+ try {
+ hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress();
+ } catch (UnknownHostException e) {
+ LOG.error("Error tyring to get localhost address: ", e);
+ }
+ }
+ addMapEntry(builder, OtherInfoType.HIVE_ADDRESS, hiveInstanceAddress);
+
+ String hiveInstanceType = hookContext.isHiveServerQuery() ? "HS2" : "CLI";
+ addMapEntry(builder, OtherInfoType.HIVE_INSTANCE_TYPE, hiveInstanceType);
+
+ ApplicationId llapId = determineLlapId(conf, executionMode);
+ if (llapId != null) {
+ addMapEntry(builder, OtherInfoType.LLAP_APP_ID, llapId.toString());
+ }
+
+ conf.stripHiddenConfigurations(conf);
+ JSONObject confObj = new JSONObject();
+ for (Map.Entry<String, String> setting : conf) {
+ confObj.put(setting.getKey(), setting.getValue());
+ }
+ addMapEntry(builder, OtherInfoType.CONF, confObj.toString());
+ return builder.build();
+ }
+
+ private HiveHookEventProto getPostHookEvent(HookContext hookContext, boolean success) {
+ QueryPlan plan = hookContext.getQueryPlan();
+ LOG.info("Received post-hook notification for: " + plan.getQueryId());
+
+ HiveHookEventProto.Builder builder = HiveHookEventProto.newBuilder();
+ builder.setEventType(EventType.QUERY_COMPLETED.name());
+ builder.setTimestamp(clock.getTime());
+ builder.setHiveQueryId(plan.getQueryId());
+ builder.setUser(getUser(hookContext));
+ builder.setRequestUser(getRequestUser(hookContext));
+ if (hookContext.getOperationId() != null) {
+ builder.setOperationId(hookContext.getOperationId());
+ }
+ addMapEntry(builder, OtherInfoType.STATUS, Boolean.toString(success));
+ JSONObject perfObj = new JSONObject(hookContext.getPerfLogger().getEndTimes());
+ addMapEntry(builder, OtherInfoType.PERF, perfObj.toString());
+
+ return builder.build();
+ }
+
+ private void addMapEntry(HiveHookEventProto.Builder builder, OtherInfoType key, String value) {
+ if (value != null) {
+ builder.addOtherInfo(
+ MapFieldEntry.newBuilder().setKey(key.name()).setValue(value).build());
+ }
+ }
+
+ private String getUser(HookContext hookContext) {
+ return hookContext.getUgi().getShortUserName();
+ }
+
+ private String getRequestUser(HookContext hookContext) {
+ String requestuser = hookContext.getUserName();
+ if (requestuser == null) {
+ requestuser = hookContext.getUgi().getUserName();
+ }
+ return requestuser;
+ }
+
+ private List<String> getTablesFromEntitySet(Set<? extends Entity> entities) {
+ List<String> tableNames = new ArrayList<>();
+ for (Entity entity : entities) {
+ if (entity.getType() == Entity.Type.TABLE) {
+ tableNames.add(entity.getTable().getDbName() + "." + entity.getTable().getTableName());
+ }
+ }
+ return tableNames;
+ }
+
+ private ExecutionMode getExecutionMode(QueryPlan plan, List<ExecDriver> mrTasks,
+ List<TezTask> tezTasks) {
+ if (tezTasks.size() > 0) {
+ // Need to go in and check if any of the tasks is running in LLAP mode.
+ for (TezTask tezTask : tezTasks) {
+ if (tezTask.getWork().getLlapMode()) {
+ return ExecutionMode.LLAP;
+ }
+ }
+ return ExecutionMode.TEZ;
+ } else if (mrTasks.size() > 0) {
+ return ExecutionMode.MR;
+ } else if (Utilities.getSparkTasks(plan.getRootTasks()).size() > 0) {
+ return ExecutionMode.SPARK;
+ } else {
+ return ExecutionMode.NONE;
+ }
+ }
+
+ private JSONObject getExplainPlan(QueryPlan plan, HiveConf conf, HookContext hookContext)
+ throws Exception {
+ // Get explain plan for the query.
+ ExplainConfiguration config = new ExplainConfiguration();
+ config.setFormatted(true);
+ ExplainWork work = new ExplainWork(null, // resFile
+ null, // pCtx
+ plan.getRootTasks(), // RootTasks
+ plan.getFetchTask(), // FetchTask
+ null, // analyzer
+ config, // explainConfig
+ null // cboInfo
+ );
+ ExplainTask explain = (ExplainTask) TaskFactory.get(work, conf);
+ explain.initialize(hookContext.getQueryState(), plan, null, null);
+ return explain.getJSONPlan(null, work);
+ }
+
+ private ApplicationId determineLlapId(HiveConf conf, ExecutionMode mode) {
+ // Note: for now, LLAP is only supported in Tez tasks. Will never come to MR; others may
+ // be added here, although this is only necessary to have extra debug information.
+ if (mode == ExecutionMode.LLAP) {
+ // In HS2, the client should have been cached already for the common case.
+ // Otherwise, this may actually introduce delay to compilation for the first query.
+ String hosts = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+ if (hosts != null && !hosts.isEmpty()) {
+ try {
+ return LlapRegistryService.getClient(conf).getApplicationId();
+ } catch (IOException e) {
+ LOG.error("Error trying to get llap instance", e);
+ }
+ } else {
+ LOG.info("Cannot determine LLAP instance on client - service hosts are not set");
+ return null;
+ }
+ }
+ return null;
+ }
+
+ // Singleton using DCL.
+ private static volatile EventLogger instance;
+ static EventLogger getInstance(HiveConf conf) {
+ if (instance == null) {
+ synchronized (EventLogger.class) {
+ if (instance == null) {
+ instance = new EventLogger(conf, SystemClock.getInstance());
+ ShutdownHookManager.addShutdownHook(instance::shutdown);
+ }
+ }
+ }
+ return instance;
+ }
+ }
+
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+ try {
+ EventLogger logger = EventLogger.getInstance(hookContext.getConf());
+ logger.handle(hookContext);
+ } catch (Exception e) {
+ LOG.error("Got exceptoin while processing event: ", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e08cc6e6/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java
new file mode 100644
index 0000000..1c4296c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public class ProtoMessageReader<T extends MessageLite> implements Closeable {
+ private final Path filePath;
+ private final SequenceFile.Reader reader;
+ private final ProtoMessageWritable<T> writable;
+
+ ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
+ this.filePath = filePath;
+ this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath));
+ this.writable = new ProtoMessageWritable<>(parser);
+ }
+
+ public Path getFilePath() {
+ return filePath;
+ }
+
+ public void setOffset(long offset) throws IOException {
+ reader.seek(offset);
+ }
+
+ public long getOffset() throws IOException {
+ return reader.getPosition();
+ }
+
+ public T readEvent() throws IOException {
+ if (!reader.next(NullWritable.get(), writable)) {
+ return null;
+ }
+ return writable.getMessage();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e08cc6e6/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java
new file mode 100644
index 0000000..61d8449
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.Writable;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public class ProtoMessageWritable<T extends MessageLite> implements Writable {
+ private T message;
+ private final Parser<T> parser;
+ private DataOutputStream dos;
+ private CodedOutputStream cos;
+ private DataInputStream din;
+ private CodedInputStream cin;
+
+ ProtoMessageWritable(Parser<T> parser) {
+ this.parser = parser;
+ }
+
+ public T getMessage() {
+ return message;
+ }
+
+ public void setMessage(T message) {
+ this.message = message;
+ }
+
+ private static class DataOutputStream extends OutputStream {
+ DataOutput out;
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (dos == null) {
+ dos = new DataOutputStream();
+ cos = CodedOutputStream.newInstance(dos);
+ }
+ dos.out = out;
+ cos.writeMessageNoTag(message);
+ cos.flush();
+ }
+
+ private static class DataInputStream extends InputStream {
+ DataInput in;
+ @Override
+ public int read() throws IOException {
+ try {
+ return in.readUnsignedByte();
+ } catch (EOFException e) {
+ return -1;
+ }
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ if (din == null) {
+ din = new DataInputStream();
+ cin = CodedInputStream.newInstance(din);
+ cin.setSizeLimit(Integer.MAX_VALUE);
+ }
+ din.in = in;
+ message = cin.readMessage(parser, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e08cc6e6/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java
new file mode 100644
index 0000000..ed8de93
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public class ProtoMessageWriter<T extends MessageLite> implements Closeable {
+ private final Path filePath;
+ private final SequenceFile.Writer writer;
+ private final ProtoMessageWritable<T> writable;
+
+ ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
+ this.filePath = filePath;
+ this.writer = SequenceFile.createWriter(
+ conf,
+ SequenceFile.Writer.file(filePath),
+ SequenceFile.Writer.keyClass(NullWritable.class),
+ SequenceFile.Writer.valueClass(ProtoMessageWritable.class),
+ SequenceFile.Writer.appendIfExists(true),
+ SequenceFile.Writer.compression(CompressionType.RECORD));
+ this.writable = new ProtoMessageWritable<>(parser);
+ }
+
+ public Path getPath() {
+ return filePath;
+ }
+
+ public long getOffset() throws IOException {
+ return writer.getLength();
+ }
+
+ public void writeProto(T message) throws IOException {
+ writable.setMessage(message);
+ writer.append(NullWritable.get(), writable);
+ }
+
+ public void hflush() throws IOException {
+ writer.hflush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e08cc6e6/ql/src/protobuf/HiveEvents.proto
----------------------------------------------------------------------
diff --git a/ql/src/protobuf/HiveEvents.proto b/ql/src/protobuf/HiveEvents.proto
new file mode 100644
index 0000000..eab0cc9
--- /dev/null
+++ b/ql/src/protobuf/HiveEvents.proto
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hadoop.hive.ql.hooks.proto";
+option java_outer_classname = "HiveHookEvents";
+
+message MapFieldEntry {
+ optional string key = 1;
+ optional string value = 2;
+}
+
+message HiveHookEventProto {
+ optional string eventType = 1;
+ optional string hiveQueryId = 2;
+ optional int64 timestamp = 3;
+ optional string executionMode = 4;
+ optional string requestUser = 5;
+ optional string queue = 6;
+ optional string user = 7;
+ optional string operationId = 8;
+ repeated string tablesWritten = 9;
+ repeated string tablesRead = 10;
+ repeated MapFieldEntry otherInfo = 50;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e08cc6e6/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
new file mode 100644
index 0000000..5e117fe
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventLogger;
+import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventType;
+import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.OtherInfoType;
+import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
+import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.HiveHookEventProto;
+import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+
+public class TestHiveProtoLoggingHook {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ private HiveConf conf;
+ private HookContext context;
+ private String tmpFolder;
+
+ @Before
+ public void setup() throws Exception {
+ conf = new HiveConf();
+ tmpFolder = folder.newFolder().getAbsolutePath();
+ conf.set(HiveProtoLoggingHook.HIVE_EVENTS_BASE_PATH, tmpFolder);
+ QueryState state = new QueryState.Builder().withHiveConf(conf).build();
+ @SuppressWarnings("serial")
+ QueryPlan queryPlan = new QueryPlan(HiveOperation.QUERY) {};
+ queryPlan.setQueryId("test_queryId");
+ queryPlan.setQueryStartTime(1234L);
+ queryPlan.setRootTasks(new ArrayList<>());
+ queryPlan.setInputs(new HashSet<>());
+ queryPlan.setOutputs(new HashSet<>());
+
+ PerfLogger perf = PerfLogger.getPerfLogger(conf, true);
+ context = new HookContext(queryPlan, state, null, "test_user", "192.168.10.10",
+ "hive_addr", "test_op_id", "test_session_id", "test_thread_id", true, perf, null);
+ }
+
+ @Test
+ public void testPreEventLog() throws Exception {
+ context.setHookType(HookType.PRE_EXEC_HOOK);
+ EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance());
+ evtLogger.handle(context);
+ evtLogger.shutdown();
+
+ HiveHookEventProto event = loadEvent(conf, tmpFolder);
+
+ Assert.assertEquals(EventType.QUERY_SUBMITTED.name(), event.getEventType());
+ Assert.assertEquals(1234L, event.getTimestamp());
+ Assert.assertEquals(System.getProperty("user.name"), event.getUser());
+ Assert.assertEquals("test_user", event.getRequestUser());
+ Assert.assertEquals("test_queryId", event.getHiveQueryId());
+ Assert.assertEquals("test_op_id", event.getOperationId());
+ Assert.assertEquals("NONE", event.getExecutionMode());
+
+ assertOtherInfo(event, OtherInfoType.TEZ, Boolean.FALSE.toString());
+ assertOtherInfo(event, OtherInfoType.MAPRED, Boolean.FALSE.toString());
+ assertOtherInfo(event, OtherInfoType.CLIENT_IP_ADDRESS, "192.168.10.10");
+ assertOtherInfo(event, OtherInfoType.SESSION_ID, "test_session_id");
+ assertOtherInfo(event, OtherInfoType.THREAD_NAME, "test_thread_id");
+ assertOtherInfo(event, OtherInfoType.HIVE_INSTANCE_TYPE, "HS2");
+ assertOtherInfo(event, OtherInfoType.HIVE_ADDRESS, "hive_addr");
+ assertOtherInfo(event, OtherInfoType.CONF, null);
+ assertOtherInfo(event, OtherInfoType.QUERY, null);
+ }
+
+ @Test
+ public void testPostEventLog() throws Exception {
+ context.setHookType(HookType.POST_EXEC_HOOK);
+
+ EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance());
+ evtLogger.handle(context);
+ evtLogger.shutdown();
+
+ HiveHookEventProto event = loadEvent(conf, tmpFolder);
+ Assert.assertEquals(EventType.QUERY_COMPLETED.name(), event.getEventType());
+ Assert.assertEquals(System.getProperty("user.name"), event.getUser());
+ Assert.assertEquals("test_user", event.getRequestUser());
+ Assert.assertEquals("test_queryId", event.getHiveQueryId());
+ Assert.assertEquals("test_op_id", event.getOperationId());
+
+ assertOtherInfo(event, OtherInfoType.STATUS, Boolean.TRUE.toString());
+ assertOtherInfo(event, OtherInfoType.PERF, null);
+ }
+
+ @Test
+ public void testFailureEventLog() throws Exception {
+ context.setHookType(HookType.ON_FAILURE_HOOK);
+
+ EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance());
+ evtLogger.handle(context);
+ evtLogger.shutdown();
+
+ HiveHookEventProto event = loadEvent(conf, tmpFolder);
+ Assert.assertEquals(EventType.QUERY_COMPLETED.name(), event.getEventType());
+ Assert.assertEquals(System.getProperty("user.name"), event.getUser());
+ Assert.assertEquals("test_user", event.getRequestUser());
+ Assert.assertEquals("test_queryId", event.getHiveQueryId());
+ Assert.assertEquals("test_op_id", event.getOperationId());
+
+ assertOtherInfo(event, OtherInfoType.STATUS, Boolean.FALSE.toString());
+ assertOtherInfo(event, OtherInfoType.PERF, null);
+ }
+
+ private HiveHookEventProto loadEvent(HiveConf conf, String tmpFolder)
+ throws IOException, FileNotFoundException {
+ Path path = new Path(tmpFolder);
+ FileSystem fs = path.getFileSystem(conf);
+ FileStatus[] status = fs.listStatus(path);
+ Assert.assertEquals(1, status.length);
+ status = fs.listStatus(status[0].getPath());
+ Assert.assertEquals(1, status.length);
+
+ DatePartitionedLogger<HiveHookEventProto> logger = new DatePartitionedLogger<>(
+ HiveHookEventProto.PARSER, path, conf, SystemClock.getInstance());
+ ProtoMessageReader<HiveHookEventProto> reader = logger.getReader(status[0].getPath());
+ HiveHookEventProto event = reader.readEvent();
+ Assert.assertNotNull(event);
+ return event;
+ }
+
+ private void assertOtherInfo(HiveHookEventProto event, OtherInfoType key, String value) {
+ for (MapFieldEntry otherInfo : event.getOtherInfoList()) {
+ if (otherInfo.getKey().equals(key.name())) {
+ if (value != null) {
+ Assert.assertEquals(value, otherInfo.getValue());
+ }
+ return;
+ }
+ }
+ Assert.fail("Cannot find key: " + key);
+ }
+}