You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/02/07 08:57:28 UTC

hive git commit: HIVE-18628: Make tez dag status check interval configurable (Prasanth Jayachandran reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master f7dea1060 -> acc62e3d5


HIVE-18628: Make tez dag status check interval configurable (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/acc62e3d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/acc62e3d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/acc62e3d

Branch: refs/heads/master
Commit: acc62e3d53c03bbc1f2d72362b0d4661c9f419fb
Parents: f7dea10
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Wed Feb 7 00:57:12 2018 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Wed Feb 7 00:57:12 2018 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  4 +-
 .../hive/jdbc/AbstractJdbcTriggersTest.java     |  2 +-
 .../jdbc/TestTriggersMoveWorkloadManager.java   | 39 +++++++++++++++++++-
 .../hive/jdbc/TestTriggersWorkloadManager.java  |  2 +-
 .../hive/ql/exec/tez/TezSessionPoolManager.java |  2 +-
 .../ql/exec/tez/TriggerValidatorRunnable.java   |  3 ++
 .../hive/ql/exec/tez/WorkloadManager.java       |  2 +-
 .../ql/exec/tez/monitoring/TezJobMonitor.java   | 11 +++---
 8 files changed, 54 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 99e8457..eca3573 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3120,6 +3120,8 @@ public class HiveConf extends Configuration {
         true,
         "Allows hive server 2 to send progress bar update information. This is currently available"
             + " only if the execution engine is tez."),
+    TEZ_DAG_STATUS_CHECK_INTERVAL("hive.tez.dag.status.check.interval", "500ms",
+      new TimeValidator(TimeUnit.MILLISECONDS), "Interval between subsequent DAG status invocation."),
     SPARK_EXEC_INPLACE_PROGRESS("hive.spark.exec.inplace.progress", true,
         "Updates spark job execution progress in-place in the terminal."),
     TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f,
@@ -3532,7 +3534,7 @@ public class HiveConf extends Configuration {
             Constants.LLAP_LOGGER_NAME_CONSOLE),
         "logger used for llap-daemons."),
 
-    HIVE_TRIGGER_VALIDATION_INTERVAL_MS("hive.trigger.validation.interval.ms", "500ms",
+    HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms",
       new TimeValidator(TimeUnit.MILLISECONDS),
       "Interval for validating triggers during execution of a query. Triggers defined in resource plan will get\n" +
         "validated for all SQL operations after every defined interval (default: 500ms) and corresponding action\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
index 62ee66f..e1d8ab2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
@@ -69,7 +69,7 @@ public abstract class AbstractJdbcTriggersTest {
     conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
     conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
     conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default");
-    conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS);
+    conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100, TimeUnit.MILLISECONDS);
     conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, true);
     conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true);
     conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);

http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
index 74ca958..8aca2a6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
@@ -59,7 +59,8 @@ public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest {
     conf = new HiveConf();
     conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
     conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
-    conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS);
+    conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 50, TimeUnit.MILLISECONDS);
+    conf.setTimeVar(ConfVars.TEZ_DAG_STATUS_CHECK_INTERVAL, 50, TimeUnit.MILLISECONDS);
     conf.setVar(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE, "default");
     conf.setBoolean("hive.test.workload.management", true);
     conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true);
@@ -185,6 +186,42 @@ public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest {
     runQueryWithTrigger(query, setCmds, killTrigger + " violated", errCaptureExpect);
   }
 
+  // TODO: disabling this test as tez publishes counters only after task completion which will cause write side counters
+  // to be not validated correctly (DAG will be completed before validation)
+//  @Test(timeout = 60000)
+//  public void testTriggerMoveKill() throws Exception {
+//    Expression moveExpression1 = ExpressionFactory.fromString("HDFS_BYTES_READ > 100");
+//    Expression moveExpression2 = ExpressionFactory.fromString("HDFS_BYTES_WRITTEN > 200");
+//    Trigger moveTrigger1 = new ExecutionTrigger("move_big_read", moveExpression1,
+//      new Action(Action.Type.MOVE_TO_POOL, "ETL"));
+//    Trigger killTrigger = new ExecutionTrigger("big_write_kill", moveExpression2,
+//      new Action(Action.Type.KILL_QUERY));
+//    setupTriggers(Lists.newArrayList(moveTrigger1), Lists.newArrayList(killTrigger));
+//    String query = "select t1.under_col, t1.value from " + tableName + " t1 join " + tableName +
+//      " t2 on t1.under_col>=t2.under_col order by t1.under_col, t1.value";
+//    List<String> setCmds = new ArrayList<>();
+//    setCmds.add("set hive.tez.session.events.print.summary=json");
+//    setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+//    setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+//    List<String> errCaptureExpect = new ArrayList<>();
+//    errCaptureExpect.add("Workload Manager Events Summary");
+//    errCaptureExpect.add("Event: GET Pool: BI Cluster %: 80.00");
+//    errCaptureExpect.add("Event: MOVE Pool: ETL Cluster %: 20.00");
+//    errCaptureExpect.add("Event: KILL Pool: null Cluster %: 0.00");
+//    errCaptureExpect.add("Event: RETURN Pool: null Cluster %: 0.00");
+//    errCaptureExpect.add("\"eventType\" : \"GET\"");
+//    errCaptureExpect.add("\"eventType\" : \"MOVE\"");
+//    errCaptureExpect.add("\"eventType\" : \"KILL\"");
+//    errCaptureExpect.add("\"eventType\" : \"RETURN\"");
+//    errCaptureExpect.add("\"name\" : \"move_big_read\"");
+//    errCaptureExpect.add("\"name\" : \"big_write_kill\"");
+//    // violation in BI queue
+//    errCaptureExpect.add("\"violationMsg\" : \"Trigger " + moveTrigger1 + " violated");
+//    // violation in ETL queue
+//    errCaptureExpect.add("\"violationMsg\" : \"Trigger " + killTrigger + " violated");
+//    runQueryWithTrigger(query, setCmds, killTrigger + " violated", errCaptureExpect);
+//  }
+
   @Test(timeout = 60000)
   public void testTriggerMoveConflictKill() throws Exception {
     Expression moveExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100");

http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
index 285e533..85391ac 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
@@ -51,7 +51,7 @@ public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManag
     conf = new HiveConf();
     conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
     conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
-    conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS);
+    conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100, TimeUnit.MILLISECONDS);
     conf.setVar(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE, "default");
     conf.setBoolean("hive.test.workload.management", true);
     conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true);

http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 82fdf6c..d0b32b8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -182,7 +182,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
   public void initTriggers(final HiveConf conf) throws HiveException {
     if (triggerValidatorRunnable == null) {
       final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, ConfVars
-        .HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
+        .HIVE_TRIGGER_VALIDATION_INTERVAL, TimeUnit.MILLISECONDS);
       sessionTriggerProvider = new SessionTriggerProvider(openSessions, new LinkedList<>());
       triggerActionHandler = new KillTriggerActionHandler();
       triggerValidatorRunnable = new TriggerValidatorRunnable(

http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
index 6414f05..9ccaa1f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
@@ -53,6 +53,9 @@ public class TriggerValidatorRunnable implements Runnable {
           for (Trigger currentTrigger : triggers) {
             String desiredCounter = currentTrigger.getExpression().getCounterLimit().getName();
             // there could be interval where desired counter value is not populated by the time we make this check
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Validating trigger: {} against currentCounters: {}", currentTrigger, currentCounters);
+            }
             if (currentCounters.containsKey(desiredCounter)) {
               long currentCounterValue = currentCounters.get(desiredCounter);
               if (currentTrigger.apply(currentCounterValue)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 98505b6..915b016 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -251,7 +251,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     allocationManager.start();
 
     final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf,
-      HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
+      HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, TimeUnit.MILLISECONDS);
     TriggerActionHandler<?> triggerActionHandler = new KillMoveTriggerActionHandler(this);
     triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(perPoolProviders, triggerActionHandler,
       triggerValidationIntervalMs);

http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
index 3558475..166ecfc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -69,10 +69,9 @@ import com.google.common.base.Preconditions;
  * completion.
  */
 public class TezJobMonitor {
+  private static final Logger LOG = LoggerFactory.getLogger(TezJobMonitor.class);
 
   static final String CLASS_NAME = TezJobMonitor.class.getName();
-  private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
-  private static final int MIN_CHECK_INTERVAL = 200;
   private static final int MAX_CHECK_INTERVAL = 1000;
   private static final int MAX_RETRY_INTERVAL = 2500;
   private static final int MAX_RETRY_FAILURES = (MAX_RETRY_INTERVAL / MAX_CHECK_INTERVAL) + 1;
@@ -157,7 +156,8 @@ public class TezJobMonitor {
     DAGStatus.State lastState = null;
     boolean running = false;
 
-    long checkInterval = MIN_CHECK_INTERVAL;
+    long checkInterval = HiveConf.getTimeVar(hiveConf, HiveConf.ConfVars.TEZ_DAG_STATUS_CHECK_INTERVAL,
+      TimeUnit.MILLISECONDS);
     WmContext wmContext = null;
     while (true) {
 
@@ -179,6 +179,9 @@ public class TezJobMonitor {
           if (desiredCounters != null && !desiredCounters.isEmpty()) {
             Map<String, Long> currentCounters = getCounterValues(dagCounters, vertexNames, vertexProgressMap,
               desiredCounters, done);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Requested DAG status. checkInterval: {}. currentCounters: {}", checkInterval, currentCounters);
+            }
             wmContext.setCurrentCounters(currentCounters);
           }
         }
@@ -204,8 +207,6 @@ public class TezJobMonitor {
                 console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n");
                 this.executionStartTime = System.currentTimeMillis();
                 running = true;
-                // from running -> failed/succeeded, the AM breaks out of timeouts
-                checkInterval = MAX_CHECK_INTERVAL;
               }
               updateFunction.update(status, vertexProgressMap);
               break;