You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/01/24 19:18:26 UTC
hive git commit: HIVE-15541: Hive OOM when ATSHook enabled and ATS
goes down (Jason Dere, reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master e37500a88 -> e9475e482
HIVE-15541: Hive OOM when ATSHook enabled and ATS goes down (Jason Dere, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e9475e48
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e9475e48
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e9475e48
Branch: refs/heads/master
Commit: e9475e48200ac023d86fa96f5c83fe36198a8593
Parents: e37500a
Author: Jason Dere <jd...@hortonworks.com>
Authored: Tue Jan 24 11:16:54 2017 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Tue Jan 24 11:16:54 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../apache/hadoop/hive/ql/hooks/ATSHook.java | 182 +++++++++++--------
2 files changed, 114 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e9475e48/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1dbae40..fc9734b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -504,6 +504,9 @@ public class HiveConf extends Configuration {
"Comma-separated list of statistics publishers to be invoked on counters on each job. \n" +
"A client stats publisher is specified as the name of a Java class which implements the \n" +
"org.apache.hadoop.hive.ql.stats.ClientStatsPublisher interface."),
+ ATSHOOKQUEUECAPACITY("hive.ats.hook.queue.capacity", 64,
+ "Queue size for the ATS Hook executor. If the number of outstanding submissions \n" +
+ "to the ATS executor exceed this amount, the Hive ATS Hook will not try to log queries to ATS."),
EXECPARALLEL("hive.exec.parallel", false, "Whether to execute jobs in parallel"),
EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8,
"How many jobs at most can be executed in parallel"),
http://git-wip-us.apache.org/repos/asf/hive/blob/e9475e48/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
index 3651c9c..d1c7953 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
@@ -25,8 +25,12 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
@@ -62,6 +66,7 @@ public class ATSHook implements ExecuteWithHookContext {
private static final Object LOCK = new Object();
private static final int VERSION = 2;
private static ExecutorService executor;
+ private static ExecutorService senderExecutor;
private static TimelineClient timelineClient;
private enum EntityTypes { HIVE_QUERY_ID };
private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED };
@@ -84,12 +89,29 @@ public class ATSHook implements ExecuteWithHookContext {
PerfLogger.GET_SPLITS, PerfLogger.RUN_TASKS,
};
- public ATSHook() {
+ private static void setupAtsExecutor(HiveConf conf) {
synchronized(LOCK) {
if (executor == null) {
- executor = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build());
+ // The call to ATS appears to block indefinitely, blocking the ATS thread while
+ // the hook continues to submit work to the ExecutorService with each query.
+ // Over time the queued items can cause OOM as the HookContext seems to contain
+ // some items which use a lot of memory.
+ // Prevent this situation by creating executor with bounded capacity -
+ // the event will not be sent to ATS if there are too many outstanding work submissions.
+ int queueCapacity = conf.getIntVar(HiveConf.ConfVars.ATSHOOKQUEUECAPACITY);
+
+ // Executor to create the ATS events.
+ // This can use significant resources and should not be done on the main query thread.
+ LOG.info("Creating ATS executor queue with capacity " + queueCapacity);
+ BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueCapacity);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build();
+ executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, queue, threadFactory);
+
+ // Create a separate thread to send the events.
+ // Keep separate from the creating events in case the send blocks.
+ BlockingQueue<Runnable> senderQueue = new LinkedBlockingQueue<Runnable>(queueCapacity);
+ senderExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, senderQueue, threadFactory);
YarnConfiguration yarnConf = new YarnConfiguration();
timelineClient = TimelineClient.createTimelineClient();
@@ -109,7 +131,9 @@ public class ATSHook implements ExecuteWithHookContext {
});
}
}
+ }
+ public ATSHook() {
LOG.info("Created ATS Hook");
}
@@ -118,77 +142,84 @@ public class ATSHook implements ExecuteWithHookContext {
final long currentTime = System.currentTimeMillis();
final HiveConf conf = new HiveConf(hookContext.getConf());
final QueryState queryState = hookContext.getQueryState();
+ final String queryId = queryState.getQueryId();
- executor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- QueryPlan plan = hookContext.getQueryPlan();
- if (plan == null) {
- return;
- }
- String queryId = plan.getQueryId();
- String opId = hookContext.getOperationId();
- long queryStartTime = plan.getQueryStartTime();
- String user = hookContext.getUgi().getUserName();
- String requestuser = hookContext.getUserName();
- if (hookContext.getUserName() == null ){
- requestuser = hookContext.getUgi().getUserName() ;
- }
- int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
- int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size();
- if (numMrJobs + numTezJobs <= 0) {
- return; // ignore client only queries
- }
+ try {
+ setupAtsExecutor(conf);
+
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ QueryPlan plan = hookContext.getQueryPlan();
+ if (plan == null) {
+ return;
+ }
+ String queryId = plan.getQueryId();
+ String opId = hookContext.getOperationId();
+ long queryStartTime = plan.getQueryStartTime();
+ String user = hookContext.getUgi().getUserName();
+ String requestuser = hookContext.getUserName();
+ if (hookContext.getUserName() == null ){
+ requestuser = hookContext.getUgi().getUserName() ;
+ }
+ int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
+ int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size();
+ if (numMrJobs + numTezJobs <= 0) {
+ return; // ignore client only queries
+ }
- switch(hookContext.getHookType()) {
- case PRE_EXEC_HOOK:
- ExplainConfiguration config = new ExplainConfiguration();
- config.setFormatted(true);
- ExplainWork work = new ExplainWork(null,// resFile
- null,// pCtx
- plan.getRootTasks(),// RootTasks
- plan.getFetchTask(),// FetchTask
- null,// analyzer
- config, //explainConfig
- null// cboInfo
- );
- @SuppressWarnings("unchecked")
- ExplainTask explain = (ExplainTask) TaskFactory.get(work, conf);
- explain.initialize(queryState, plan, null, null);
- String query = plan.getQueryStr();
- JSONObject explainPlan = explain.getJSONPlan(null, work);
- String logID = conf.getLogIdVar(hookContext.getSessionId());
- List<String> tablesRead = getTablesFromEntitySet(hookContext.getInputs());
- List<String> tablesWritten = getTablesFromEntitySet(hookContext.getOutputs());
- String executionMode = getExecutionMode(plan).name();
- String hiveInstanceAddress = hookContext.getHiveInstanceAddress();
- if (hiveInstanceAddress == null) {
- hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress();
+ switch(hookContext.getHookType()) {
+ case PRE_EXEC_HOOK:
+ ExplainConfiguration config = new ExplainConfiguration();
+ config.setFormatted(true);
+ ExplainWork work = new ExplainWork(null,// resFile
+ null,// pCtx
+ plan.getRootTasks(),// RootTasks
+ plan.getFetchTask(),// FetchTask
+ null,// analyzer
+ config, //explainConfig
+ null// cboInfo
+ );
+ @SuppressWarnings("unchecked")
+ ExplainTask explain = (ExplainTask) TaskFactory.get(work, conf);
+ explain.initialize(queryState, plan, null, null);
+ String query = plan.getQueryStr();
+ JSONObject explainPlan = explain.getJSONPlan(null, work);
+ String logID = conf.getLogIdVar(hookContext.getSessionId());
+ List<String> tablesRead = getTablesFromEntitySet(hookContext.getInputs());
+ List<String> tablesWritten = getTablesFromEntitySet(hookContext.getOutputs());
+ String executionMode = getExecutionMode(plan).name();
+ String hiveInstanceAddress = hookContext.getHiveInstanceAddress();
+ if (hiveInstanceAddress == null) {
+ hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress();
+ }
+ String hiveInstanceType = hookContext.isHiveServerQuery() ? "HS2" : "CLI";
+ fireAndForget(
+ createPreHookEvent(queryId, query, explainPlan, queryStartTime,
+ user, requestuser, numMrJobs, numTezJobs, opId,
+ hookContext.getIpAddress(), hiveInstanceAddress, hiveInstanceType,
+ hookContext.getSessionId(), logID, hookContext.getThreadId(), executionMode,
+ tablesRead, tablesWritten, conf));
+ break;
+ case POST_EXEC_HOOK:
+ fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, hookContext.getPerfLogger()));
+ break;
+ case ON_FAILURE_HOOK:
+ fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, hookContext.getPerfLogger()));
+ break;
+ default:
+ //ignore
+ break;
}
- String hiveInstanceType = hookContext.isHiveServerQuery() ? "HS2" : "CLI";
- fireAndForget(conf,
- createPreHookEvent(queryId, query, explainPlan, queryStartTime,
- user, requestuser, numMrJobs, numTezJobs, opId,
- hookContext.getIpAddress(), hiveInstanceAddress, hiveInstanceType,
- hookContext.getSessionId(), logID, hookContext.getThreadId(), executionMode,
- tablesRead, tablesWritten, conf));
- break;
- case POST_EXEC_HOOK:
- fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, hookContext.getPerfLogger()));
- break;
- case ON_FAILURE_HOOK:
- fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, hookContext.getPerfLogger()));
- break;
- default:
- //ignore
- break;
+ } catch (Exception e) {
+ LOG.warn("Failed to submit plan to ATS for " + queryId, e);
}
- } catch (Exception e) {
- LOG.info("Failed to submit plan to ATS: " + StringUtils.stringifyException(e));
}
- }
- });
+ });
+ } catch (Exception e) {
+ LOG.warn("Failed to submit to ATS for " + queryId, e);
+ }
}
protected List<String> getTablesFromEntitySet(Set<? extends Entity> entities) {
@@ -319,7 +350,16 @@ public class ATSHook implements ExecuteWithHookContext {
return atsEntity;
}
- synchronized void fireAndForget(Configuration conf, TimelineEntity entity) throws Exception {
- timelineClient.putEntities(entity);
+ void fireAndForget(final TimelineEntity entity) throws Exception {
+ senderExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ timelineClient.putEntities(entity);
+ } catch (Exception err) {
+ LOG.warn("Failed to send event to ATS", err);
+ }
+ }
+ });
}
}