You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2018/08/16 07:02:14 UTC

hive git commit: Revert "BUG-108021 / BUG-108287 / HIVE-20383 : Invalid queue name and synchronisation issues in hive proto events hook. (Harish JP, reviewd by Anishek Agarwal)"

Repository: hive
Updated Branches:
  refs/heads/master 05d4e4ebc -> 076f4d2d7


Revert "BUG-108021 / BUG-108287 / HIVE-20383 : Invalid queue name and synchronisation issues in hive proto events hook. (Harish JP, reviewd by Anishek Agarwal)"

This reverts commit 05d4e4ebcec1c97e3b4e86e7e1fc0717e5b13d05.


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/076f4d2d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/076f4d2d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/076f4d2d

Branch: refs/heads/master
Commit: 076f4d2d7f447b25b5e5d9ff74359a5ed8043945
Parents: 05d4e4e
Author: Anishek Agarwal <an...@gmail.com>
Authored: Thu Aug 16 12:32:01 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Thu Aug 16 12:32:01 2018 +0530

----------------------------------------------------------------------
 .../hive/ql/hooks/HiveProtoLoggingHook.java     | 51 ++++++------
 .../hive/ql/hooks/TestHiveProtoLoggingHook.java | 88 ++------------------
 2 files changed, 30 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/076f4d2d/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index 45e1ab3..155b2be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -118,12 +118,10 @@ import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry;
 import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
 import org.apache.hadoop.hive.ql.plan.ExplainWork;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hive.common.util.ShutdownHookManager;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger;
 import org.apache.tez.dag.history.logging.proto.ProtoMessageWriter;
 import org.json.JSONObject;
@@ -182,6 +180,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
     private final Clock clock;
     private final String logFileName;
     private final DatePartitionedLogger<HiveHookEventProto> logger;
+    private final ExecutorService eventHandler;
     private final ExecutorService logWriter;
     private int logFileCount = 0;
     private ProtoMessageWriter<HiveHookEventProto> writer;
@@ -208,6 +207,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       }
       this.logger = tmpLogger;
       if (logger == null) {
+        eventHandler = null;
         logWriter = null;
         return;
       }
@@ -216,16 +216,25 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
           HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT);
 
       ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+          .setNameFormat("Hive Hook Proto Event Handler %d").build();
+      eventHandler = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
+          new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory);
+
+      threadFactory = new ThreadFactoryBuilder().setDaemon(true)
           .setNameFormat("Hive Hook Proto Log Writer %d").build();
       logWriter = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
           new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory);
     }
 
     void shutdown() {
-      if (logWriter != null) {
-        logWriter.shutdown();
+      // Wait for all the events to be written off, the order of service is important
+      for (ExecutorService service : new ExecutorService[] {eventHandler, logWriter}) {
+        if (service == null) {
+          continue;
+        }
+        service.shutdown();
         try {
-          logWriter.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
+          service.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
           LOG.warn("Got interrupted exception while waiting for events to be flushed", e);
         }
@@ -237,9 +246,14 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       if (logger == null) {
         return;
       }
-      // Note: same hookContext object is used for all the events for a given query, if we try to
-      // do it async we have concurrency issues and when query cache is enabled, post event comes
-      // before we start the pre hook processing and causes inconsistent events publishing.
+      try {
+        eventHandler.execute(() -> generateEvent(hookContext));
+      } catch (RejectedExecutionException e) {
+        LOG.warn("Handler queue full ignoring event: " + hookContext.getHookType());
+      }
+    }
+
+    private void generateEvent(HookContext hookContext) {
       QueryPlan plan = hookContext.getQueryPlan();
       if (plan == null) {
         LOG.debug("Received null query plan.");
@@ -329,10 +343,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       builder.setHiveQueryId(plan.getQueryId());
       builder.setUser(getUser(hookContext));
       builder.setRequestUser(getRequestUser(hookContext));
-      String queueName = getQueueName(executionMode, conf);
-      if (queueName != null) {
-        builder.setQueue(queueName);
-      }
+      builder.setQueue(conf.get("mapreduce.job.queuename"));
       builder.setExecutionMode(executionMode.name());
       builder.addAllTablesRead(getTablesFromEntitySet(hookContext.getInputs()));
       builder.addAllTablesWritten(getTablesFromEntitySet(hookContext.getOutputs()));
@@ -374,6 +385,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       ApplicationId llapId = determineLlapId(conf, executionMode);
       if (llapId != null) {
         addMapEntry(builder, OtherInfoType.LLAP_APP_ID, llapId.toString());
+        builder.setQueue(conf.get(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname));
       }
 
       conf.stripHiddenConfigurations(conf);
@@ -427,21 +439,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       return requestuser;
     }
 
-    private String getQueueName(ExecutionMode mode, HiveConf conf) {
-      switch (mode) {
-      case LLAP:
-        return conf.get(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname);
-      case MR:
-        return conf.get(MRJobConfig.QUEUE_NAME);
-      case TEZ:
-        return conf.get(TezConfiguration.TEZ_QUEUE_NAME);
-      case SPARK:
-      case NONE:
-      default:
-        return null;
-      }
-    }
-
     private List<String> getTablesFromEntitySet(Set<? extends Entity> entities) {
       List<String> tableNames = new ArrayList<>();
       for (Entity entity : entities) {

http://git-wip-us.apache.org/repos/asf/hive/blob/076f4d2d/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
index a5939fa..8124528 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.hooks;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -29,22 +30,15 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
-import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventLogger;
 import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventType;
-import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.ExecutionMode;
 import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.OtherInfoType;
 import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
 import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.HiveHookEventProto;
 import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger;
 import org.apache.tez.dag.history.logging.proto.ProtoMessageReader;
 import org.junit.Assert;
@@ -69,9 +63,6 @@ public class TestHiveProtoLoggingHook {
   @Before
   public void setup() throws Exception {
     conf = new HiveConf();
-    conf.set(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, "llap_queue");
-    conf.set(MRJobConfig.QUEUE_NAME, "mr_queue");
-    conf.set(TezConfiguration.TEZ_QUEUE_NAME, "tez_queue");
     tmpFolder = folder.newFolder().getAbsolutePath();
     conf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder);
     QueryState state = new QueryState.Builder().withHiveConf(conf).build();
@@ -103,8 +94,7 @@ public class TestHiveProtoLoggingHook {
     Assert.assertEquals("test_user", event.getRequestUser());
     Assert.assertEquals("test_queryId", event.getHiveQueryId());
     Assert.assertEquals("test_op_id", event.getOperationId());
-    Assert.assertEquals(ExecutionMode.NONE.name(), event.getExecutionMode());
-    Assert.assertFalse(event.hasQueue());
+    Assert.assertEquals("NONE", event.getExecutionMode());
 
     assertOtherInfo(event, OtherInfoType.TEZ, Boolean.FALSE.toString());
     assertOtherInfo(event, OtherInfoType.MAPRED, Boolean.FALSE.toString());
@@ -118,69 +108,6 @@ public class TestHiveProtoLoggingHook {
   }
 
   @Test
-  public void testQueueLogs() throws Exception {
-    context.setHookType(HookType.PRE_EXEC_HOOK);
-    EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance());
-
-    // This makes it MR task
-    context.getQueryPlan().getRootTasks().add(new ExecDriver());
-    evtLogger.handle(context);
-
-    // This makes it Tez task
-    MapWork mapWork = new MapWork();
-    TezWork tezWork = new TezWork("test_queryid");
-    tezWork.add(mapWork);
-    TezTask task = new TezTask();
-    task.setId("id1");
-    task.setWork(tezWork);
-    context.getQueryPlan().getRootTasks().add(task);
-    context.getQueryPlan().getRootTasks().add(new TezTask());
-    evtLogger.handle(context);
-
-    // This makes it llap task
-    mapWork.setLlapMode(true);
-    evtLogger.handle(context);
-
-    evtLogger.shutdown();
-
-    ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder);
-
-    HiveHookEventProto event = reader.readEvent();
-    Assert.assertNotNull(event);
-    Assert.assertEquals(ExecutionMode.MR.name(), event.getExecutionMode());
-    Assert.assertEquals(event.getQueue(), "mr_queue");
-
-    event = reader.readEvent();
-    Assert.assertNotNull(event);
-    Assert.assertEquals(ExecutionMode.TEZ.name(), event.getExecutionMode());
-    Assert.assertEquals(event.getQueue(), "tez_queue");
-
-    event = reader.readEvent();
-    Assert.assertNotNull(event);
-    Assert.assertEquals(ExecutionMode.LLAP.name(), event.getExecutionMode());
-    Assert.assertEquals(event.getQueue(), "llap_queue");
-  }
-
-  @Test
-  public void testPreAndPostEventBoth() throws Exception {
-    context.setHookType(HookType.PRE_EXEC_HOOK);
-    EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance());
-    evtLogger.handle(context);
-    context.setHookType(HookType.POST_EXEC_HOOK);
-    evtLogger.handle(context);
-    evtLogger.shutdown();
-
-    ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder);
-    HiveHookEventProto event = reader.readEvent();
-    Assert.assertNotNull("Pre hook event not found", event);
-    Assert.assertEquals(EventType.QUERY_SUBMITTED.name(), event.getEventType());
-
-    event = reader.readEvent();
-    Assert.assertNotNull("Post hook event not found", event);
-    Assert.assertEquals(EventType.QUERY_COMPLETED.name(), event.getEventType());
-  }
-
-  @Test
   public void testPostEventLog() throws Exception {
     context.setHookType(HookType.POST_EXEC_HOOK);
     context.getPerfLogger().PerfLogBegin("test", "LogTest");
@@ -224,21 +151,18 @@ public class TestHiveProtoLoggingHook {
     assertOtherInfo(event, OtherInfoType.PERF, null);
   }
 
-  private ProtoMessageReader<HiveHookEventProto> getTestReader(HiveConf conf, String tmpFolder)
-      throws IOException {
+  private HiveHookEventProto loadEvent(HiveConf conf, String tmpFolder)
+      throws IOException, FileNotFoundException {
     Path path = new Path(tmpFolder);
     FileSystem fs = path.getFileSystem(conf);
     FileStatus[] status = fs.listStatus(path);
     Assert.assertEquals(1, status.length);
     status = fs.listStatus(status[0].getPath());
     Assert.assertEquals(1, status.length);
+
     DatePartitionedLogger<HiveHookEventProto> logger = new DatePartitionedLogger<>(
         HiveHookEventProto.PARSER, path, conf, SystemClock.getInstance());
-    return logger.getReader(status[0].getPath());
-  }
-
-  private HiveHookEventProto loadEvent(HiveConf conf, String tmpFolder) throws IOException {
-    ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder);
+    ProtoMessageReader<HiveHookEventProto> reader = logger.getReader(status[0].getPath());
     HiveHookEventProto event = reader.readEvent();
     Assert.assertNotNull(event);
     return event;