You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/01/24 19:18:26 UTC

hive git commit: HIVE-15541: Hive OOM when ATSHook enabled and ATS goes down (Jason Dere, reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master e37500a88 -> e9475e482


HIVE-15541: Hive OOM when ATSHook enabled and ATS goes down (Jason Dere, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e9475e48
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e9475e48
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e9475e48

Branch: refs/heads/master
Commit: e9475e48200ac023d86fa96f5c83fe36198a8593
Parents: e37500a
Author: Jason Dere <jd...@hortonworks.com>
Authored: Tue Jan 24 11:16:54 2017 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Tue Jan 24 11:16:54 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../apache/hadoop/hive/ql/hooks/ATSHook.java    | 182 +++++++++++--------
 2 files changed, 114 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e9475e48/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1dbae40..fc9734b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -504,6 +504,9 @@ public class HiveConf extends Configuration {
         "Comma-separated list of statistics publishers to be invoked on counters on each job. \n" +
         "A client stats publisher is specified as the name of a Java class which implements the \n" +
         "org.apache.hadoop.hive.ql.stats.ClientStatsPublisher interface."),
+    ATSHOOKQUEUECAPACITY("hive.ats.hook.queue.capacity", 64,
+        "Queue size for the ATS Hook executor. If the number of outstanding submissions \n" +
+        "to the ATS executor exceed this amount, the Hive ATS Hook will not try to log queries to ATS."),
     EXECPARALLEL("hive.exec.parallel", false, "Whether to execute jobs in parallel"),
     EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8,
         "How many jobs at most can be executed in parallel"),

http://git-wip-us.apache.org/repos/asf/hive/blob/e9475e48/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
index 3651c9c..d1c7953 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
@@ -25,8 +25,12 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
@@ -62,6 +66,7 @@ public class ATSHook implements ExecuteWithHookContext {
   private static final Object LOCK = new Object();
   private static final int VERSION = 2;
   private static ExecutorService executor;
+  private static ExecutorService senderExecutor;
   private static TimelineClient timelineClient;
   private enum EntityTypes { HIVE_QUERY_ID };
   private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED };
@@ -84,12 +89,29 @@ public class ATSHook implements ExecuteWithHookContext {
     PerfLogger.GET_SPLITS, PerfLogger.RUN_TASKS,
   };
 
-  public ATSHook() {
+  private static void setupAtsExecutor(HiveConf conf) {
     synchronized(LOCK) {
       if (executor == null) {
 
-        executor = Executors.newSingleThreadExecutor(
-           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build());
+        // The call to ATS appears to block indefinitely, blocking the ATS thread while
+        // the hook continues to submit work to the ExecutorService with each query.
+        // Over time the queued items can cause OOM as the HookContext seems to contain
+        // some items which use a lot of memory.
+        // Prevent this situation by creating executor with bounded capacity -
+        // the event will not be sent to ATS if there are too many outstanding work submissions.
+        int queueCapacity = conf.getIntVar(HiveConf.ConfVars.ATSHOOKQUEUECAPACITY);
+
+        // Executor to create the ATS events.
+        // This can use significant resources and should not be done on the main query thread.
+        LOG.info("Creating ATS executor queue with capacity " + queueCapacity);
+        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueCapacity);
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build();
+        executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, queue, threadFactory);
+
+        // Create a separate thread to send the events.
+        // Keep separate from the creating events in case the send blocks.
+        BlockingQueue<Runnable> senderQueue = new LinkedBlockingQueue<Runnable>(queueCapacity);
+        senderExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, senderQueue, threadFactory);
 
         YarnConfiguration yarnConf = new YarnConfiguration();
         timelineClient = TimelineClient.createTimelineClient();
@@ -109,7 +131,9 @@ public class ATSHook implements ExecuteWithHookContext {
         });
       }
     }
+  }
 
+  public ATSHook() {
     LOG.info("Created ATS Hook");
   }
 
@@ -118,77 +142,84 @@ public class ATSHook implements ExecuteWithHookContext {
     final long currentTime = System.currentTimeMillis();
     final HiveConf conf = new HiveConf(hookContext.getConf());
     final QueryState queryState = hookContext.getQueryState();
+    final String queryId = queryState.getQueryId();
 
-    executor.submit(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            QueryPlan plan = hookContext.getQueryPlan();
-            if (plan == null) {
-              return;
-            }
-            String queryId = plan.getQueryId();
-            String opId = hookContext.getOperationId();
-            long queryStartTime = plan.getQueryStartTime();
-            String user = hookContext.getUgi().getUserName();
-            String requestuser = hookContext.getUserName();
-            if (hookContext.getUserName() == null ){
-              requestuser = hookContext.getUgi().getUserName() ;
-            }
-            int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
-            int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size();
-            if (numMrJobs + numTezJobs <= 0) {
-              return; // ignore client only queries
-            }
+    try {
+      setupAtsExecutor(conf);
+
+      executor.submit(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              QueryPlan plan = hookContext.getQueryPlan();
+              if (plan == null) {
+                return;
+              }
+              String queryId = plan.getQueryId();
+              String opId = hookContext.getOperationId();
+              long queryStartTime = plan.getQueryStartTime();
+              String user = hookContext.getUgi().getUserName();
+              String requestuser = hookContext.getUserName();
+              if (hookContext.getUserName() == null ){
+                requestuser = hookContext.getUgi().getUserName() ;
+              }
+              int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
+              int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size();
+              if (numMrJobs + numTezJobs <= 0) {
+                return; // ignore client only queries
+              }
 
-            switch(hookContext.getHookType()) {
-            case PRE_EXEC_HOOK:
-            ExplainConfiguration config = new ExplainConfiguration();
-            config.setFormatted(true);
-            ExplainWork work = new ExplainWork(null,// resFile
-                null,// pCtx
-                plan.getRootTasks(),// RootTasks
-                plan.getFetchTask(),// FetchTask
-                null,// analyzer
-                config, //explainConfig
-                null// cboInfo
-            );
-              @SuppressWarnings("unchecked")
-              ExplainTask explain = (ExplainTask) TaskFactory.get(work, conf);
-              explain.initialize(queryState, plan, null, null);
-              String query = plan.getQueryStr();
-              JSONObject explainPlan = explain.getJSONPlan(null, work);
-              String logID = conf.getLogIdVar(hookContext.getSessionId());
-              List<String> tablesRead = getTablesFromEntitySet(hookContext.getInputs());
-              List<String> tablesWritten = getTablesFromEntitySet(hookContext.getOutputs());
-              String executionMode = getExecutionMode(plan).name();
-              String hiveInstanceAddress = hookContext.getHiveInstanceAddress();
-              if (hiveInstanceAddress == null) {
-                hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress();
+              switch(hookContext.getHookType()) {
+              case PRE_EXEC_HOOK:
+              ExplainConfiguration config = new ExplainConfiguration();
+              config.setFormatted(true);
+              ExplainWork work = new ExplainWork(null,// resFile
+                  null,// pCtx
+                  plan.getRootTasks(),// RootTasks
+                  plan.getFetchTask(),// FetchTask
+                  null,// analyzer
+                  config, //explainConfig
+                  null// cboInfo
+              );
+                @SuppressWarnings("unchecked")
+                ExplainTask explain = (ExplainTask) TaskFactory.get(work, conf);
+                explain.initialize(queryState, plan, null, null);
+                String query = plan.getQueryStr();
+                JSONObject explainPlan = explain.getJSONPlan(null, work);
+                String logID = conf.getLogIdVar(hookContext.getSessionId());
+                List<String> tablesRead = getTablesFromEntitySet(hookContext.getInputs());
+                List<String> tablesWritten = getTablesFromEntitySet(hookContext.getOutputs());
+                String executionMode = getExecutionMode(plan).name();
+                String hiveInstanceAddress = hookContext.getHiveInstanceAddress();
+                if (hiveInstanceAddress == null) {
+                  hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress();
+                }
+                String hiveInstanceType = hookContext.isHiveServerQuery() ? "HS2" : "CLI";
+                fireAndForget(
+                    createPreHookEvent(queryId, query, explainPlan, queryStartTime,
+                        user, requestuser, numMrJobs, numTezJobs, opId,
+                        hookContext.getIpAddress(), hiveInstanceAddress, hiveInstanceType,
+                        hookContext.getSessionId(), logID, hookContext.getThreadId(), executionMode,
+                        tablesRead, tablesWritten, conf));
+                break;
+              case POST_EXEC_HOOK:
+                fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, hookContext.getPerfLogger()));
+                break;
+              case ON_FAILURE_HOOK:
+                fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, hookContext.getPerfLogger()));
+                break;
+              default:
+                //ignore
+                break;
               }
-              String hiveInstanceType = hookContext.isHiveServerQuery() ? "HS2" : "CLI";
-              fireAndForget(conf,
-                  createPreHookEvent(queryId, query, explainPlan, queryStartTime,
-                      user, requestuser, numMrJobs, numTezJobs, opId,
-                      hookContext.getIpAddress(), hiveInstanceAddress, hiveInstanceType,
-                      hookContext.getSessionId(), logID, hookContext.getThreadId(), executionMode,
-                      tablesRead, tablesWritten, conf));
-              break;
-            case POST_EXEC_HOOK:
-              fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, hookContext.getPerfLogger()));
-              break;
-            case ON_FAILURE_HOOK:
-              fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, hookContext.getPerfLogger()));
-              break;
-            default:
-              //ignore
-              break;
+            } catch (Exception e) {
+              LOG.warn("Failed to submit plan to ATS for " + queryId, e);
             }
-          } catch (Exception e) {
-            LOG.info("Failed to submit plan to ATS: " + StringUtils.stringifyException(e));
           }
-        }
-      });
+        });
+    } catch (Exception e) {
+      LOG.warn("Failed to submit to ATS for " + queryId, e);
+    }
   }
 
   protected List<String> getTablesFromEntitySet(Set<? extends Entity> entities) {
@@ -319,7 +350,16 @@ public class ATSHook implements ExecuteWithHookContext {
     return atsEntity;
   }
 
-  synchronized void fireAndForget(Configuration conf, TimelineEntity entity) throws Exception {
-    timelineClient.putEntities(entity);
+  void fireAndForget(final TimelineEntity entity) throws Exception {
+    senderExecutor.submit(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          timelineClient.putEntities(entity);
+        } catch (Exception err) {
+          LOG.warn("Failed to send event to ATS", err);
+        }
+      }
+    });
   }
 }