You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2018/08/16 07:03:17 UTC
hive git commit: HIVE-20383: Invalid queue name and synchronisation
issues in hive proto events hook.(Harish JP, reviewd by Anishek Agarwal)
Repository: hive
Updated Branches:
refs/heads/master 076f4d2d7 -> 463a51257
HIVE-20383: Invalid queue name and synchronisation issues in hive proto events hook.(Harish JP, reviewd by Anishek Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/463a5125
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/463a5125
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/463a5125
Branch: refs/heads/master
Commit: 463a51257dfd31b212d0d1a4cd9805abc86493f6
Parents: 076f4d2
Author: Anishek Agarwal <an...@gmail.com>
Authored: Thu Aug 16 12:32:57 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Thu Aug 16 12:32:57 2018 +0530
----------------------------------------------------------------------
.../hive/ql/hooks/HiveProtoLoggingHook.java | 51 ++++++------
.../hive/ql/hooks/TestHiveProtoLoggingHook.java | 88 ++++++++++++++++++--
2 files changed, 109 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/463a5125/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index 155b2be..45e1ab3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -118,10 +118,12 @@ import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry;
import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
import org.apache.hadoop.hive.ql.plan.ExplainWork;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger;
import org.apache.tez.dag.history.logging.proto.ProtoMessageWriter;
import org.json.JSONObject;
@@ -180,7 +182,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
private final Clock clock;
private final String logFileName;
private final DatePartitionedLogger<HiveHookEventProto> logger;
- private final ExecutorService eventHandler;
private final ExecutorService logWriter;
private int logFileCount = 0;
private ProtoMessageWriter<HiveHookEventProto> writer;
@@ -207,7 +208,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
}
this.logger = tmpLogger;
if (logger == null) {
- eventHandler = null;
logWriter = null;
return;
}
@@ -216,25 +216,16 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Hive Hook Proto Event Handler %d").build();
- eventHandler = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory);
-
- threadFactory = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Hive Hook Proto Log Writer %d").build();
logWriter = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory);
}
void shutdown() {
- // Wait for all the events to be written off, the order of service is important
- for (ExecutorService service : new ExecutorService[] {eventHandler, logWriter}) {
- if (service == null) {
- continue;
- }
- service.shutdown();
+ if (logWriter != null) {
+ logWriter.shutdown();
try {
- service.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
+ logWriter.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("Got interrupted exception while waiting for events to be flushed", e);
}
@@ -246,14 +237,9 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
if (logger == null) {
return;
}
- try {
- eventHandler.execute(() -> generateEvent(hookContext));
- } catch (RejectedExecutionException e) {
- LOG.warn("Handler queue full ignoring event: " + hookContext.getHookType());
- }
- }
-
- private void generateEvent(HookContext hookContext) {
+ // Note: same hookContext object is used for all the events for a given query, if we try to
+ // do it async we have concurrency issues and when query cache is enabled, post event comes
+ // before we start the pre hook processing and causes inconsistent events publishing.
QueryPlan plan = hookContext.getQueryPlan();
if (plan == null) {
LOG.debug("Received null query plan.");
@@ -343,7 +329,10 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
builder.setHiveQueryId(plan.getQueryId());
builder.setUser(getUser(hookContext));
builder.setRequestUser(getRequestUser(hookContext));
- builder.setQueue(conf.get("mapreduce.job.queuename"));
+ String queueName = getQueueName(executionMode, conf);
+ if (queueName != null) {
+ builder.setQueue(queueName);
+ }
builder.setExecutionMode(executionMode.name());
builder.addAllTablesRead(getTablesFromEntitySet(hookContext.getInputs()));
builder.addAllTablesWritten(getTablesFromEntitySet(hookContext.getOutputs()));
@@ -385,7 +374,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
ApplicationId llapId = determineLlapId(conf, executionMode);
if (llapId != null) {
addMapEntry(builder, OtherInfoType.LLAP_APP_ID, llapId.toString());
- builder.setQueue(conf.get(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname));
}
conf.stripHiddenConfigurations(conf);
@@ -439,6 +427,21 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
return requestuser;
}
+ private String getQueueName(ExecutionMode mode, HiveConf conf) {
+ switch (mode) {
+ case LLAP:
+ return conf.get(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname);
+ case MR:
+ return conf.get(MRJobConfig.QUEUE_NAME);
+ case TEZ:
+ return conf.get(TezConfiguration.TEZ_QUEUE_NAME);
+ case SPARK:
+ case NONE:
+ default:
+ return null;
+ }
+ }
+
private List<String> getTablesFromEntitySet(Set<? extends Entity> entities) {
List<String> tableNames = new ArrayList<>();
for (Entity entity : entities) {
http://git-wip-us.apache.org/repos/asf/hive/blob/463a5125/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
index 8124528..a5939fa 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.hooks;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -30,15 +29,22 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventLogger;
import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventType;
+import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.ExecutionMode;
import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.OtherInfoType;
import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.HiveHookEventProto;
import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger;
import org.apache.tez.dag.history.logging.proto.ProtoMessageReader;
import org.junit.Assert;
@@ -63,6 +69,9 @@ public class TestHiveProtoLoggingHook {
@Before
public void setup() throws Exception {
conf = new HiveConf();
+ conf.set(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, "llap_queue");
+ conf.set(MRJobConfig.QUEUE_NAME, "mr_queue");
+ conf.set(TezConfiguration.TEZ_QUEUE_NAME, "tez_queue");
tmpFolder = folder.newFolder().getAbsolutePath();
conf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder);
QueryState state = new QueryState.Builder().withHiveConf(conf).build();
@@ -94,7 +103,8 @@ public class TestHiveProtoLoggingHook {
Assert.assertEquals("test_user", event.getRequestUser());
Assert.assertEquals("test_queryId", event.getHiveQueryId());
Assert.assertEquals("test_op_id", event.getOperationId());
- Assert.assertEquals("NONE", event.getExecutionMode());
+ Assert.assertEquals(ExecutionMode.NONE.name(), event.getExecutionMode());
+ Assert.assertFalse(event.hasQueue());
assertOtherInfo(event, OtherInfoType.TEZ, Boolean.FALSE.toString());
assertOtherInfo(event, OtherInfoType.MAPRED, Boolean.FALSE.toString());
@@ -108,6 +118,69 @@ public class TestHiveProtoLoggingHook {
}
@Test
+ public void testQueueLogs() throws Exception {
+ context.setHookType(HookType.PRE_EXEC_HOOK);
+ EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance());
+
+ // This makes it MR task
+ context.getQueryPlan().getRootTasks().add(new ExecDriver());
+ evtLogger.handle(context);
+
+ // This makes it Tez task
+ MapWork mapWork = new MapWork();
+ TezWork tezWork = new TezWork("test_queryid");
+ tezWork.add(mapWork);
+ TezTask task = new TezTask();
+ task.setId("id1");
+ task.setWork(tezWork);
+ context.getQueryPlan().getRootTasks().add(task);
+ context.getQueryPlan().getRootTasks().add(new TezTask());
+ evtLogger.handle(context);
+
+ // This makes it llap task
+ mapWork.setLlapMode(true);
+ evtLogger.handle(context);
+
+ evtLogger.shutdown();
+
+ ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder);
+
+ HiveHookEventProto event = reader.readEvent();
+ Assert.assertNotNull(event);
+ Assert.assertEquals(ExecutionMode.MR.name(), event.getExecutionMode());
+ Assert.assertEquals(event.getQueue(), "mr_queue");
+
+ event = reader.readEvent();
+ Assert.assertNotNull(event);
+ Assert.assertEquals(ExecutionMode.TEZ.name(), event.getExecutionMode());
+ Assert.assertEquals(event.getQueue(), "tez_queue");
+
+ event = reader.readEvent();
+ Assert.assertNotNull(event);
+ Assert.assertEquals(ExecutionMode.LLAP.name(), event.getExecutionMode());
+ Assert.assertEquals(event.getQueue(), "llap_queue");
+ }
+
+ @Test
+ public void testPreAndPostEventBoth() throws Exception {
+ context.setHookType(HookType.PRE_EXEC_HOOK);
+ EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance());
+ evtLogger.handle(context);
+ context.setHookType(HookType.POST_EXEC_HOOK);
+ evtLogger.handle(context);
+ evtLogger.shutdown();
+
+ ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder);
+ HiveHookEventProto event = reader.readEvent();
+ Assert.assertNotNull("Pre hook event not found", event);
+ Assert.assertEquals(EventType.QUERY_SUBMITTED.name(), event.getEventType());
+
+ event = reader.readEvent();
+ Assert.assertNotNull("Post hook event not found", event);
+ Assert.assertEquals(EventType.QUERY_COMPLETED.name(), event.getEventType());
+ }
+
+ @Test
public void testPostEventLog() throws Exception {
context.setHookType(HookType.POST_EXEC_HOOK);
context.getPerfLogger().PerfLogBegin("test", "LogTest");
@@ -151,18 +224,21 @@ public class TestHiveProtoLoggingHook {
assertOtherInfo(event, OtherInfoType.PERF, null);
}
- private HiveHookEventProto loadEvent(HiveConf conf, String tmpFolder)
- throws IOException, FileNotFoundException {
+ private ProtoMessageReader<HiveHookEventProto> getTestReader(HiveConf conf, String tmpFolder)
+ throws IOException {
Path path = new Path(tmpFolder);
FileSystem fs = path.getFileSystem(conf);
FileStatus[] status = fs.listStatus(path);
Assert.assertEquals(1, status.length);
status = fs.listStatus(status[0].getPath());
Assert.assertEquals(1, status.length);
-
DatePartitionedLogger<HiveHookEventProto> logger = new DatePartitionedLogger<>(
HiveHookEventProto.PARSER, path, conf, SystemClock.getInstance());
- ProtoMessageReader<HiveHookEventProto> reader = logger.getReader(status[0].getPath());
+ return logger.getReader(status[0].getPath());
+ }
+
+ private HiveHookEventProto loadEvent(HiveConf conf, String tmpFolder) throws IOException {
+ ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder);
HiveHookEventProto event = reader.readEvent();
Assert.assertNotNull(event);
return event;