You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2018/08/16 07:02:14 UTC
hive git commit: Revert "BUG-108021 / BUG-108287 / HIVE-20383 :
Invalid queue name and synchronisation issues in hive proto events hook.
(Harish JP, reviewd by Anishek Agarwal)"
Repository: hive
Updated Branches:
refs/heads/master 05d4e4ebc -> 076f4d2d7
Revert "BUG-108021 / BUG-108287 / HIVE-20383 : Invalid queue name and synchronisation issues in hive proto events hook. (Harish JP, reviewd by Anishek Agarwal)"
This reverts commit 05d4e4ebcec1c97e3b4e86e7e1fc0717e5b13d05.
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/076f4d2d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/076f4d2d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/076f4d2d
Branch: refs/heads/master
Commit: 076f4d2d7f447b25b5e5d9ff74359a5ed8043945
Parents: 05d4e4e
Author: Anishek Agarwal <an...@gmail.com>
Authored: Thu Aug 16 12:32:01 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Thu Aug 16 12:32:01 2018 +0530
----------------------------------------------------------------------
.../hive/ql/hooks/HiveProtoLoggingHook.java | 51 ++++++------
.../hive/ql/hooks/TestHiveProtoLoggingHook.java | 88 ++------------------
2 files changed, 30 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/076f4d2d/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index 45e1ab3..155b2be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -118,12 +118,10 @@ import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry;
import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
import org.apache.hadoop.hive.ql.plan.ExplainWork;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hive.common.util.ShutdownHookManager;
-import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger;
import org.apache.tez.dag.history.logging.proto.ProtoMessageWriter;
import org.json.JSONObject;
@@ -182,6 +180,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
private final Clock clock;
private final String logFileName;
private final DatePartitionedLogger<HiveHookEventProto> logger;
+ private final ExecutorService eventHandler;
private final ExecutorService logWriter;
private int logFileCount = 0;
private ProtoMessageWriter<HiveHookEventProto> writer;
@@ -208,6 +207,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
}
this.logger = tmpLogger;
if (logger == null) {
+ eventHandler = null;
logWriter = null;
return;
}
@@ -216,16 +216,25 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Hive Hook Proto Event Handler %d").build();
+ eventHandler = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory);
+
+ threadFactory = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Hive Hook Proto Log Writer %d").build();
logWriter = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory);
}
void shutdown() {
- if (logWriter != null) {
- logWriter.shutdown();
+ // Wait for all the events to be written off, the order of service is important
+ for (ExecutorService service : new ExecutorService[] {eventHandler, logWriter}) {
+ if (service == null) {
+ continue;
+ }
+ service.shutdown();
try {
- logWriter.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
+ service.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("Got interrupted exception while waiting for events to be flushed", e);
}
@@ -237,9 +246,14 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
if (logger == null) {
return;
}
- // Note: same hookContext object is used for all the events for a given query, if we try to
- // do it async we have concurrency issues and when query cache is enabled, post event comes
- // before we start the pre hook processing and causes inconsistent events publishing.
+ try {
+ eventHandler.execute(() -> generateEvent(hookContext));
+ } catch (RejectedExecutionException e) {
+ LOG.warn("Handler queue full ignoring event: " + hookContext.getHookType());
+ }
+ }
+
+ private void generateEvent(HookContext hookContext) {
QueryPlan plan = hookContext.getQueryPlan();
if (plan == null) {
LOG.debug("Received null query plan.");
@@ -329,10 +343,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
builder.setHiveQueryId(plan.getQueryId());
builder.setUser(getUser(hookContext));
builder.setRequestUser(getRequestUser(hookContext));
- String queueName = getQueueName(executionMode, conf);
- if (queueName != null) {
- builder.setQueue(queueName);
- }
+ builder.setQueue(conf.get("mapreduce.job.queuename"));
builder.setExecutionMode(executionMode.name());
builder.addAllTablesRead(getTablesFromEntitySet(hookContext.getInputs()));
builder.addAllTablesWritten(getTablesFromEntitySet(hookContext.getOutputs()));
@@ -374,6 +385,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
ApplicationId llapId = determineLlapId(conf, executionMode);
if (llapId != null) {
addMapEntry(builder, OtherInfoType.LLAP_APP_ID, llapId.toString());
+ builder.setQueue(conf.get(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname));
}
conf.stripHiddenConfigurations(conf);
@@ -427,21 +439,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
return requestuser;
}
- private String getQueueName(ExecutionMode mode, HiveConf conf) {
- switch (mode) {
- case LLAP:
- return conf.get(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname);
- case MR:
- return conf.get(MRJobConfig.QUEUE_NAME);
- case TEZ:
- return conf.get(TezConfiguration.TEZ_QUEUE_NAME);
- case SPARK:
- case NONE:
- default:
- return null;
- }
- }
-
private List<String> getTablesFromEntitySet(Set<? extends Entity> entities) {
List<String> tableNames = new ArrayList<>();
for (Entity entity : entities) {
http://git-wip-us.apache.org/repos/asf/hive/blob/076f4d2d/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
index a5939fa..8124528 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.hooks;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -29,22 +30,15 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
-import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventLogger;
import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventType;
-import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.ExecutionMode;
import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.OtherInfoType;
import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.HiveHookEventProto;
import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger;
import org.apache.tez.dag.history.logging.proto.ProtoMessageReader;
import org.junit.Assert;
@@ -69,9 +63,6 @@ public class TestHiveProtoLoggingHook {
@Before
public void setup() throws Exception {
conf = new HiveConf();
- conf.set(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, "llap_queue");
- conf.set(MRJobConfig.QUEUE_NAME, "mr_queue");
- conf.set(TezConfiguration.TEZ_QUEUE_NAME, "tez_queue");
tmpFolder = folder.newFolder().getAbsolutePath();
conf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder);
QueryState state = new QueryState.Builder().withHiveConf(conf).build();
@@ -103,8 +94,7 @@ public class TestHiveProtoLoggingHook {
Assert.assertEquals("test_user", event.getRequestUser());
Assert.assertEquals("test_queryId", event.getHiveQueryId());
Assert.assertEquals("test_op_id", event.getOperationId());
- Assert.assertEquals(ExecutionMode.NONE.name(), event.getExecutionMode());
- Assert.assertFalse(event.hasQueue());
+ Assert.assertEquals("NONE", event.getExecutionMode());
assertOtherInfo(event, OtherInfoType.TEZ, Boolean.FALSE.toString());
assertOtherInfo(event, OtherInfoType.MAPRED, Boolean.FALSE.toString());
@@ -118,69 +108,6 @@ public class TestHiveProtoLoggingHook {
}
@Test
- public void testQueueLogs() throws Exception {
- context.setHookType(HookType.PRE_EXEC_HOOK);
- EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance());
-
- // This makes it MR task
- context.getQueryPlan().getRootTasks().add(new ExecDriver());
- evtLogger.handle(context);
-
- // This makes it Tez task
- MapWork mapWork = new MapWork();
- TezWork tezWork = new TezWork("test_queryid");
- tezWork.add(mapWork);
- TezTask task = new TezTask();
- task.setId("id1");
- task.setWork(tezWork);
- context.getQueryPlan().getRootTasks().add(task);
- context.getQueryPlan().getRootTasks().add(new TezTask());
- evtLogger.handle(context);
-
- // This makes it llap task
- mapWork.setLlapMode(true);
- evtLogger.handle(context);
-
- evtLogger.shutdown();
-
- ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder);
-
- HiveHookEventProto event = reader.readEvent();
- Assert.assertNotNull(event);
- Assert.assertEquals(ExecutionMode.MR.name(), event.getExecutionMode());
- Assert.assertEquals(event.getQueue(), "mr_queue");
-
- event = reader.readEvent();
- Assert.assertNotNull(event);
- Assert.assertEquals(ExecutionMode.TEZ.name(), event.getExecutionMode());
- Assert.assertEquals(event.getQueue(), "tez_queue");
-
- event = reader.readEvent();
- Assert.assertNotNull(event);
- Assert.assertEquals(ExecutionMode.LLAP.name(), event.getExecutionMode());
- Assert.assertEquals(event.getQueue(), "llap_queue");
- }
-
- @Test
- public void testPreAndPostEventBoth() throws Exception {
- context.setHookType(HookType.PRE_EXEC_HOOK);
- EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance());
- evtLogger.handle(context);
- context.setHookType(HookType.POST_EXEC_HOOK);
- evtLogger.handle(context);
- evtLogger.shutdown();
-
- ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder);
- HiveHookEventProto event = reader.readEvent();
- Assert.assertNotNull("Pre hook event not found", event);
- Assert.assertEquals(EventType.QUERY_SUBMITTED.name(), event.getEventType());
-
- event = reader.readEvent();
- Assert.assertNotNull("Post hook event not found", event);
- Assert.assertEquals(EventType.QUERY_COMPLETED.name(), event.getEventType());
- }
-
- @Test
public void testPostEventLog() throws Exception {
context.setHookType(HookType.POST_EXEC_HOOK);
context.getPerfLogger().PerfLogBegin("test", "LogTest");
@@ -224,21 +151,18 @@ public class TestHiveProtoLoggingHook {
assertOtherInfo(event, OtherInfoType.PERF, null);
}
- private ProtoMessageReader<HiveHookEventProto> getTestReader(HiveConf conf, String tmpFolder)
- throws IOException {
+ private HiveHookEventProto loadEvent(HiveConf conf, String tmpFolder)
+ throws IOException, FileNotFoundException {
Path path = new Path(tmpFolder);
FileSystem fs = path.getFileSystem(conf);
FileStatus[] status = fs.listStatus(path);
Assert.assertEquals(1, status.length);
status = fs.listStatus(status[0].getPath());
Assert.assertEquals(1, status.length);
+
DatePartitionedLogger<HiveHookEventProto> logger = new DatePartitionedLogger<>(
HiveHookEventProto.PARSER, path, conf, SystemClock.getInstance());
- return logger.getReader(status[0].getPath());
- }
-
- private HiveHookEventProto loadEvent(HiveConf conf, String tmpFolder) throws IOException {
- ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder);
+ ProtoMessageReader<HiveHookEventProto> reader = logger.getReader(status[0].getPath());
HiveHookEventProto event = reader.readEvent();
Assert.assertNotNull(event);
return event;