You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2018/08/16 07:00:09 UTC

hive git commit: BUG-108021 / BUG-108287 / HIVE-20383 : Invalid queue name and synchronisation issues in hive proto events hook. (Harish JP, reviewd by Anishek Agarwal)

Repository: hive
Updated Branches:
  refs/heads/master 649d7c12b -> 05d4e4ebc


BUG-108021 / BUG-108287 / HIVE-20383 : Invalid queue name and synchronisation issues in hive proto events hook. (Harish JP, reviewd by Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/05d4e4eb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/05d4e4eb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/05d4e4eb

Branch: refs/heads/master
Commit: 05d4e4ebcec1c97e3b4e86e7e1fc0717e5b13d05
Parents: 649d7c1
Author: Anishek Agarwal <an...@gmail.com>
Authored: Thu Aug 16 12:29:58 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Thu Aug 16 12:29:58 2018 +0530

----------------------------------------------------------------------
 .../hive/ql/hooks/HiveProtoLoggingHook.java     | 51 ++++++------
 .../hive/ql/hooks/TestHiveProtoLoggingHook.java | 88 ++++++++++++++++++--
 2 files changed, 109 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/05d4e4eb/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index 155b2be..45e1ab3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -118,10 +118,12 @@ import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry;
 import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
 import org.apache.hadoop.hive.ql.plan.ExplainWork;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger;
 import org.apache.tez.dag.history.logging.proto.ProtoMessageWriter;
 import org.json.JSONObject;
@@ -180,7 +182,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
     private final Clock clock;
     private final String logFileName;
     private final DatePartitionedLogger<HiveHookEventProto> logger;
-    private final ExecutorService eventHandler;
     private final ExecutorService logWriter;
     private int logFileCount = 0;
     private ProtoMessageWriter<HiveHookEventProto> writer;
@@ -207,7 +208,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       }
       this.logger = tmpLogger;
       if (logger == null) {
-        eventHandler = null;
         logWriter = null;
         return;
       }
@@ -216,25 +216,16 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
           HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT);
 
       ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
-          .setNameFormat("Hive Hook Proto Event Handler %d").build();
-      eventHandler = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
-          new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory);
-
-      threadFactory = new ThreadFactoryBuilder().setDaemon(true)
           .setNameFormat("Hive Hook Proto Log Writer %d").build();
       logWriter = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
           new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory);
     }
 
     void shutdown() {
-      // Wait for all the events to be written off, the order of service is important
-      for (ExecutorService service : new ExecutorService[] {eventHandler, logWriter}) {
-        if (service == null) {
-          continue;
-        }
-        service.shutdown();
+      if (logWriter != null) {
+        logWriter.shutdown();
         try {
-          service.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
+          logWriter.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
           LOG.warn("Got interrupted exception while waiting for events to be flushed", e);
         }
@@ -246,14 +237,9 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       if (logger == null) {
         return;
       }
-      try {
-        eventHandler.execute(() -> generateEvent(hookContext));
-      } catch (RejectedExecutionException e) {
-        LOG.warn("Handler queue full ignoring event: " + hookContext.getHookType());
-      }
-    }
-
-    private void generateEvent(HookContext hookContext) {
+      // Note: same hookContext object is used for all the events for a given query, if we try to
+      // do it async we have concurrency issues and when query cache is enabled, post event comes
+      // before we start the pre hook processing and causes inconsistent events publishing.
       QueryPlan plan = hookContext.getQueryPlan();
       if (plan == null) {
         LOG.debug("Received null query plan.");
@@ -343,7 +329,10 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       builder.setHiveQueryId(plan.getQueryId());
       builder.setUser(getUser(hookContext));
       builder.setRequestUser(getRequestUser(hookContext));
-      builder.setQueue(conf.get("mapreduce.job.queuename"));
+      String queueName = getQueueName(executionMode, conf);
+      if (queueName != null) {
+        builder.setQueue(queueName);
+      }
       builder.setExecutionMode(executionMode.name());
       builder.addAllTablesRead(getTablesFromEntitySet(hookContext.getInputs()));
       builder.addAllTablesWritten(getTablesFromEntitySet(hookContext.getOutputs()));
@@ -385,7 +374,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       ApplicationId llapId = determineLlapId(conf, executionMode);
       if (llapId != null) {
         addMapEntry(builder, OtherInfoType.LLAP_APP_ID, llapId.toString());
-        builder.setQueue(conf.get(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname));
       }
 
       conf.stripHiddenConfigurations(conf);
@@ -439,6 +427,21 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       return requestuser;
     }
 
+    private String getQueueName(ExecutionMode mode, HiveConf conf) {
+      switch (mode) {
+      case LLAP:
+        return conf.get(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname);
+      case MR:
+        return conf.get(MRJobConfig.QUEUE_NAME);
+      case TEZ:
+        return conf.get(TezConfiguration.TEZ_QUEUE_NAME);
+      case SPARK:
+      case NONE:
+      default:
+        return null;
+      }
+    }
+
     private List<String> getTablesFromEntitySet(Set<? extends Entity> entities) {
       List<String> tableNames = new ArrayList<>();
       for (Entity entity : entities) {

http://git-wip-us.apache.org/repos/asf/hive/blob/05d4e4eb/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
index 8124528..a5939fa 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.hooks;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -30,15 +29,22 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventLogger;
 import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventType;
+import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.ExecutionMode;
 import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.OtherInfoType;
 import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
 import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.HiveHookEventProto;
 import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger;
 import org.apache.tez.dag.history.logging.proto.ProtoMessageReader;
 import org.junit.Assert;
@@ -63,6 +69,9 @@ public class TestHiveProtoLoggingHook {
   @Before
   public void setup() throws Exception {
     conf = new HiveConf();
+    conf.set(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, "llap_queue");
+    conf.set(MRJobConfig.QUEUE_NAME, "mr_queue");
+    conf.set(TezConfiguration.TEZ_QUEUE_NAME, "tez_queue");
     tmpFolder = folder.newFolder().getAbsolutePath();
     conf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder);
     QueryState state = new QueryState.Builder().withHiveConf(conf).build();
@@ -94,7 +103,8 @@ public class TestHiveProtoLoggingHook {
     Assert.assertEquals("test_user", event.getRequestUser());
     Assert.assertEquals("test_queryId", event.getHiveQueryId());
     Assert.assertEquals("test_op_id", event.getOperationId());
-    Assert.assertEquals("NONE", event.getExecutionMode());
+    Assert.assertEquals(ExecutionMode.NONE.name(), event.getExecutionMode());
+    Assert.assertFalse(event.hasQueue());
 
     assertOtherInfo(event, OtherInfoType.TEZ, Boolean.FALSE.toString());
     assertOtherInfo(event, OtherInfoType.MAPRED, Boolean.FALSE.toString());
@@ -108,6 +118,69 @@ public class TestHiveProtoLoggingHook {
   }
 
   @Test
+  public void testQueueLogs() throws Exception {
+    context.setHookType(HookType.PRE_EXEC_HOOK);
+    EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance());
+
+    // This makes it MR task
+    context.getQueryPlan().getRootTasks().add(new ExecDriver());
+    evtLogger.handle(context);
+
+    // This makes it Tez task
+    MapWork mapWork = new MapWork();
+    TezWork tezWork = new TezWork("test_queryid");
+    tezWork.add(mapWork);
+    TezTask task = new TezTask();
+    task.setId("id1");
+    task.setWork(tezWork);
+    context.getQueryPlan().getRootTasks().add(task);
+    context.getQueryPlan().getRootTasks().add(new TezTask());
+    evtLogger.handle(context);
+
+    // This makes it llap task
+    mapWork.setLlapMode(true);
+    evtLogger.handle(context);
+
+    evtLogger.shutdown();
+
+    ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder);
+
+    HiveHookEventProto event = reader.readEvent();
+    Assert.assertNotNull(event);
+    Assert.assertEquals(ExecutionMode.MR.name(), event.getExecutionMode());
+    Assert.assertEquals(event.getQueue(), "mr_queue");
+
+    event = reader.readEvent();
+    Assert.assertNotNull(event);
+    Assert.assertEquals(ExecutionMode.TEZ.name(), event.getExecutionMode());
+    Assert.assertEquals(event.getQueue(), "tez_queue");
+
+    event = reader.readEvent();
+    Assert.assertNotNull(event);
+    Assert.assertEquals(ExecutionMode.LLAP.name(), event.getExecutionMode());
+    Assert.assertEquals(event.getQueue(), "llap_queue");
+  }
+
+  @Test
+  public void testPreAndPostEventBoth() throws Exception {
+    context.setHookType(HookType.PRE_EXEC_HOOK);
+    EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance());
+    evtLogger.handle(context);
+    context.setHookType(HookType.POST_EXEC_HOOK);
+    evtLogger.handle(context);
+    evtLogger.shutdown();
+
+    ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder);
+    HiveHookEventProto event = reader.readEvent();
+    Assert.assertNotNull("Pre hook event not found", event);
+    Assert.assertEquals(EventType.QUERY_SUBMITTED.name(), event.getEventType());
+
+    event = reader.readEvent();
+    Assert.assertNotNull("Post hook event not found", event);
+    Assert.assertEquals(EventType.QUERY_COMPLETED.name(), event.getEventType());
+  }
+
+  @Test
   public void testPostEventLog() throws Exception {
     context.setHookType(HookType.POST_EXEC_HOOK);
     context.getPerfLogger().PerfLogBegin("test", "LogTest");
@@ -151,18 +224,21 @@ public class TestHiveProtoLoggingHook {
     assertOtherInfo(event, OtherInfoType.PERF, null);
   }
 
-  private HiveHookEventProto loadEvent(HiveConf conf, String tmpFolder)
-      throws IOException, FileNotFoundException {
+  private ProtoMessageReader<HiveHookEventProto> getTestReader(HiveConf conf, String tmpFolder)
+      throws IOException {
     Path path = new Path(tmpFolder);
     FileSystem fs = path.getFileSystem(conf);
     FileStatus[] status = fs.listStatus(path);
     Assert.assertEquals(1, status.length);
     status = fs.listStatus(status[0].getPath());
     Assert.assertEquals(1, status.length);
-
     DatePartitionedLogger<HiveHookEventProto> logger = new DatePartitionedLogger<>(
         HiveHookEventProto.PARSER, path, conf, SystemClock.getInstance());
-    ProtoMessageReader<HiveHookEventProto> reader = logger.getReader(status[0].getPath());
+    return logger.getReader(status[0].getPath());
+  }
+
+  private HiveHookEventProto loadEvent(HiveConf conf, String tmpFolder) throws IOException {
+    ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder);
     HiveHookEventProto event = reader.readEvent();
     Assert.assertNotNull(event);
     return event;