You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kg...@apache.org on 2020/03/03 13:45:35 UTC

[hive] branch master updated: HIVE-22872: Support multiple executors for scheduled queries (Zoltan Haindrich reviewed by Jesus Camacho Rodriguez)

This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cdf97f  HIVE-22872: Support multiple executors for scheduled queries (Zoltan Haindrich reviewed by Jesus Camacho Rodriguez)
9cdf97f is described below

commit 9cdf97f3f851fd835f3c5caae676e1cd737816ec
Author: Zoltan Haindrich <ki...@rxd.hu>
AuthorDate: Tue Mar 3 13:34:18 2020 +0000

    HIVE-22872: Support multiple executors for scheduled queries (Zoltan Haindrich reviewed by Jesus Camacho Rodriguez)
    
    Signed-off-by: Zoltan Haindrich <ki...@rxd.hu>
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   2 +
 .../upgrade/hive/hive-schema-4.0.0.hive.sql        |   7 +-
 .../exec/schq/ScheduledQueryMaintenanceTask.java   |   8 +-
 .../scheduled/ScheduledQueryExecutionContext.java  |   4 +
 .../scheduled/ScheduledQueryExecutionService.java  | 170 +++++++++++++++++----
 .../hive/ql/schq/TestScheduledQueryService.java    |   4 -
 ql/src/test/queries/clientpositive/schq_analyze.q  |   2 +-
 .../queries/clientpositive/schq_materialized.q     |   2 +-
 .../clientpositive/llap/schq_materialized.q.out    |   2 +-
 .../test/results/clientpositive/llap/sysdb.q.out   |  10 +-
 .../results/clientpositive/llap/sysdb_schq.q.out   |   6 +-
 .../hadoop/hive/metastore/MetastoreTaskThread.java |  10 +-
 .../hadoop/hive/metastore/utils/package-info.java  |  22 ---
 .../apache/hadoop/hive/metastore/ObjectStore.java  |  23 ++-
 .../ScheduledQueryExecutionsMaintTask.java         |   7 +
 .../hive/metastore/model/MScheduledQuery.java      |   9 ++
 .../src/main/resources/package.jdo                 |   4 +
 .../src/main/sql/derby/hive-schema-4.0.0.derby.sql |   3 +-
 .../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql     |   2 +
 .../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql |  29 ++++
 .../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql     |  30 ++++
 .../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql |   1 +
 .../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql     |   2 +
 .../main/sql/oracle/hive-schema-4.0.0.oracle.sql   |   1 +
 .../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql   |   2 +
 .../sql/postgres/hive-schema-4.0.0.postgres.sql    |   1 +
 .../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql   |   2 +
 27 files changed, 290 insertions(+), 75 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3d4e9e0..7ea2de9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4873,6 +4873,8 @@ public class HiveConf extends Configuration {
     HIVE_SECURITY_AUTHORIZATION_SCHEDULED_QUERIES_SUPPORTED("hive.security.authorization.scheduled.queries.supported",
         false,
         "Enable this if the configured authorizer is able to handle scheduled query related calls."),
+    HIVE_SCHEDULED_QUERIES_MAX_EXECUTORS("hive.scheduled.queries.max.executors", 4, new RangeValidator(1, null),
+        "Maximal number of scheduled query executors to allow."),
 
     HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
         "If the query results cache is enabled. This will keep results of previously executed queries " +
diff --git a/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql b/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
index fde6f02..03540bb 100644
--- a/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
+++ b/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
@@ -1211,6 +1211,7 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `SCHEDULED_QUERIES` (
   `USER` string,
   `QUERY` string,
   `NEXT_EXECUTION` bigint,
+  `ACTIVE_EXECUTION_ID` bigint,
   CONSTRAINT `SYS_PK_SCHEDULED_QUERIES` PRIMARY KEY (`SCHEDULED_QUERY_ID`) DISABLE
 )
 STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
@@ -1225,7 +1226,8 @@ TBLPROPERTIES (
   \"SCHEDULE\",
   \"USER\",
   \"QUERY\",
-  \"NEXT_EXECUTION\"
+  \"NEXT_EXECUTION\",
+  \"ACTIVE_EXECUTION_ID\"
 FROM
   \"SCHEDULED_QUERIES\""
 );
@@ -1795,7 +1797,8 @@ select
   `SCHEDULE`,
   `USER`,
   `QUERY`,
-  FROM_UNIXTIME(NEXT_EXECUTION) as NEXT_EXECUTION
+  FROM_UNIXTIME(NEXT_EXECUTION) as NEXT_EXECUTION,
+  `ACTIVE_EXECUTION_ID`
 FROM
   SYS.SCHEDULED_QUERIES
 ;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java
index fd0c173..5abfa4d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.schq;
 
 import org.apache.hadoop.hive.metastore.api.ScheduledQueryMaintenanceRequest;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryMaintenanceRequestType;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -46,7 +47,11 @@ public class ScheduledQueryMaintenanceTask extends Task<ScheduledQueryMaintenanc
     ScheduledQueryMaintenanceRequest request = buildScheduledQueryRequest();
     try {
       Hive.get().getMSC().scheduledQueryMaintenance(request);
-      if (work.getScheduledQuery().isSetNextExecution()) {
+      if (work.getScheduledQuery().isSetNextExecution()
+          || request.getType() == ScheduledQueryMaintenanceRequestType.CREATE) {
+        // we might have a scheduled query available for execution; immediately:
+        // * in case a schedule is altered to be executed at a specific time
+        // * in case we created a new scheduled query - for say run every second
         ScheduledQueryExecutionService.forceScheduleCheck();
       }
     } catch (TException | HiveException e) {
@@ -68,5 +73,4 @@ public class ScheduledQueryMaintenanceTask extends Task<ScheduledQueryMaintenanc
   public StageType getType() {
     return StageType.SCHEDULED_QUERY_MAINT;
   }
-
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java
index 1bb24ee..32cb316 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java
@@ -66,4 +66,8 @@ public class ScheduledQueryExecutionContext {
     return conf.getTimeVar(ConfVars.HIVE_SCHEDULED_QUERIES_EXECUTOR_PROGRESS_REPORT_INTERVAL, TimeUnit.MILLISECONDS);
   }
 
+  public int getNumberOfExecutors() {
+    return conf.getIntVar(ConfVars.HIVE_SCHEDULED_QUERIES_MAX_EXECUTORS);
+  }
+
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java
index 9a6237c..8443b3f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java
@@ -19,13 +19,18 @@ package org.apache.hadoop.hive.ql.scheduled;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.QueryState;
 import org.apache.hadoop.hive.metastore.api.ScheduledQueryKey;
 import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollResponse;
@@ -48,18 +53,29 @@ public class ScheduledQueryExecutionService implements Closeable {
   private static ScheduledQueryExecutionService INSTANCE = null;
 
   private ScheduledQueryExecutionContext context;
-  private ScheduledQueryExecutor worker;
   private AtomicInteger forcedScheduleCheckCounter = new AtomicInteger();
+  private AtomicInteger usedExecutors = new AtomicInteger(0);
+  private Queue<ScheduledQueryExecutor> runningExecutors = new ConcurrentLinkedQueue<>();
 
   public static ScheduledQueryExecutionService startScheduledQueryExecutorService(HiveConf inputConf) {
     HiveConf conf = new HiveConf(inputConf);
     MetastoreBasedScheduledQueryService qService = new MetastoreBasedScheduledQueryService(conf);
-    ExecutorService executor = Executors.newCachedThreadPool(
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build());
+    ExecutorService executor = buildExecutor(conf);
     ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService);
     return startScheduledQueryExecutorService(ctx);
   }
 
+  private static ExecutorService buildExecutor(HiveConf conf) {
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build();
+    int systemThreads = 2; // poller,reporter
+    int minServiceThreads = 1; // always keep 1 thread to be used for executing scheduled queries
+    int maxServiceThreads = conf.getIntVar(ConfVars.HIVE_SCHEDULED_QUERIES_MAX_EXECUTORS);
+    return new ThreadPoolExecutor(systemThreads + minServiceThreads, systemThreads + maxServiceThreads,
+        60L, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(),
+        threadFactory);
+  }
+
   public static ScheduledQueryExecutionService startScheduledQueryExecutorService(ScheduledQueryExecutionContext ctx) {
     synchronized (ScheduledQueryExecutionService.class) {
       if (INSTANCE != null) {
@@ -73,7 +89,7 @@ public class ScheduledQueryExecutionService implements Closeable {
 
   private ScheduledQueryExecutionService(ScheduledQueryExecutionContext ctx) {
     context = ctx;
-    ctx.executor.submit(worker = new ScheduledQueryExecutor());
+    ctx.executor.submit(new ScheduledQueryPoller());
     ctx.executor.submit(new ProgressReporter());
   }
 
@@ -81,23 +97,52 @@ public class ScheduledQueryExecutionService implements Closeable {
     return state == QueryState.FINISHED || state == QueryState.FAILED;
   }
 
-  class ScheduledQueryExecutor implements Runnable {
+  /**
+   * Renames the {@link Thread} to make it more clear what it is working on.
+   */
+  static class NamedThread implements Closeable {
+    private final String oldName;
 
-    private ScheduledQueryProgressInfo info;
+    public NamedThread(String newName) {
+      LOG.info("Starting {} thread - renaming accordingly.", newName);
+      oldName = Thread.currentThread().getName();
+      Thread.currentThread().setName(newName);
+    }
+
+    @Override
+    public void close() {
+      LOG.info("Thread finished; renaming back to: {}", oldName);
+      Thread.currentThread().setName(oldName);
+    }
+  }
+
+  /**
+   * The poller is responsible for checking for available scheduled queries.
+   *
+   * It also handles forced wakeup calls to reduce the impact that the default check period might be minutes.
+   * There might be only 1 running poller service at a time in a hiveserver instance.
+   */
+  class ScheduledQueryPoller implements Runnable {
 
     @Override
     public void run() {
-      while (true) {
-        ScheduledQueryPollResponse q = context.schedulerService.scheduledQueryPoll();
-        if (q.isSetExecutionId()) {
-          try{
-            processQuery(q);
-          } catch (Throwable t) {
-            LOG.error("Unexpected exception during scheduled query processing", t);
+      try (NamedThread namedThread = new NamedThread("Scheduled Query Poller")) {
+        while (!context.executor.isShutdown()) {
+          int origResets = forcedScheduleCheckCounter.get();
+          if (usedExecutors.get() < context.getNumberOfExecutors()) {
+            try {
+              ScheduledQueryPollResponse q = context.schedulerService.scheduledQueryPoll();
+              if (q.isSetExecutionId()) {
+                context.executor.submit(new ScheduledQueryExecutor(q));
+                // skip sleep and poll again if there are available executor
+                continue;
+              }
+            } catch (Throwable t) {
+              LOG.error("Unexpected exception during scheduled query submission", t);
+            }
           }
-        } else {
           try {
-            sleep(context.getIdleSleepTime());
+            sleep(context.getIdleSleepTime(), origResets);
           } catch (InterruptedException e) {
             LOG.warn("interrupt discarded");
           }
@@ -105,9 +150,8 @@ public class ScheduledQueryExecutionService implements Closeable {
       }
     }
 
-    private void sleep(long idleSleepTime) throws InterruptedException {
+    private void sleep(long idleSleepTime, int origResets) throws InterruptedException {
       long checkIntrvalMs = 1000;
-      int origResets = forcedScheduleCheckCounter.get();
       for (long i = 0; i < idleSleepTime; i += checkIntrvalMs) {
         Thread.sleep(checkIntrvalMs);
         if (forcedScheduleCheckCounter.get() != origResets) {
@@ -116,6 +160,47 @@ public class ScheduledQueryExecutionService implements Closeable {
       }
     }
 
+  }
+
+  private void executorStarted(ScheduledQueryExecutor executor) {
+    runningExecutors.add(executor);
+    usedExecutors.incrementAndGet();
+  }
+
+  private void executorStopped(ScheduledQueryExecutor executor) {
+    usedExecutors.decrementAndGet();
+    runningExecutors.remove(executor);
+    forceScheduleCheck();
+  }
+
+  /**
+   * Responsible for a single execution of a scheduled query.
+   *
+   * The execution happens in a separate thread.
+   */
+  class ScheduledQueryExecutor implements Runnable {
+
+    private ScheduledQueryProgressInfo info;
+    private final ScheduledQueryPollResponse pollResponse;
+
+    public ScheduledQueryExecutor(ScheduledQueryPollResponse pollResponse) {
+      this.pollResponse = pollResponse;
+      executorStarted(this);
+    }
+
+    public void run() {
+      try (NamedThread namedThread = new NamedThread(getThreadName())) {
+        processQuery(pollResponse);
+      } finally {
+        executorStopped(this);
+      }
+    }
+
+    private String getThreadName() {
+      return String.format("Scheduled Query Executor(schedule:%s, execution_id:%d)",
+          pollResponse.getScheduleKey().getScheduleName(), pollResponse.getExecutionId());
+    }
+
     public synchronized void reportQueryProgress() {
       if (info != null) {
         LOG.info("Reporting query progress of {} as {} err:{}", info.getScheduledExecutionId(), info.getState(),
@@ -128,10 +213,12 @@ public class ScheduledQueryExecutionService implements Closeable {
     }
 
     private void processQuery(ScheduledQueryPollResponse q) {
-      SessionState state = null;
+      LOG.info("Executing schq:{}, executionId: {}", q.getScheduleKey().getScheduleName(), q.getExecutionId());
       info = new ScheduledQueryProgressInfo();
-      info.setScheduledExecutionId(q.getExecutionId());
+      info.setScheduledExecutionId(pollResponse.getExecutionId());
       info.setState(QueryState.EXECUTING);
+      info.setExecutorQueryId(buildExecutorQueryId(""));
+      SessionState state = null;
       try {
         HiveConf conf = new HiveConf(context.conf);
         conf.set(Constants.HIVE_QUERY_EXCLUSIVE_LOCK, lockNameFor(q.getScheduleKey()));
@@ -162,7 +249,11 @@ public class ScheduledQueryExecutionService implements Closeable {
     }
 
     private String buildExecutorQueryId(IDriver driver) {
-      return String.format("%s/%s", context.executorHostName, driver.getQueryState().getQueryId());
+      return buildExecutorQueryId(driver.getQueryState().getQueryId());
+    }
+
+    private String buildExecutorQueryId(String queryId) {
+      return String.format("%s/%s", context.executorHostName, queryId);
     }
 
     private String lockNameFor(ScheduledQueryKey scheduleKey) {
@@ -179,43 +270,56 @@ public class ScheduledQueryExecutionService implements Closeable {
     }
   }
 
+  /**
+   * Reports progress periodically.
+   *
+   * To retain the running state of all the in-flight scheduled query executions;
+   * this class initiates a reporting round periodically.
+   */
   class ProgressReporter implements Runnable {
 
     @Override
     public void run() {
-      while (true) {
-        try {
-          Thread.sleep(context.getProgressReporterSleepTime());
-        } catch (InterruptedException e) {
-          LOG.warn("interrupt discarded");
-        }
-        try {
-          worker.reportQueryProgress();
-        } catch (Exception e) {
-          LOG.error("ProgressReporter encountered exception ", e);
+      try (NamedThread namedThread = new NamedThread("Scheduled Query Progress Reporter")) {
+        while (!context.executor.isShutdown()) {
+          try {
+            Thread.sleep(context.getProgressReporterSleepTime());
+          } catch (InterruptedException e) {
+            LOG.warn("interrupt discarded");
+          }
+          try {
+            for (ScheduledQueryExecutor worker : runningExecutors) {
+              worker.reportQueryProgress();
+            }
+          } catch (Exception e) {
+            LOG.error("ProgressReporter encountered exception ", e);
+          }
         }
       }
     }
   }
 
-  @VisibleForTesting
   @Override
   public void close() throws IOException {
     synchronized (ScheduledQueryExecutionService.class) {
       if (INSTANCE == null || INSTANCE != this) {
         throw new IllegalStateException("The current ScheduledQueryExecutionService INSTANCE is invalid");
       }
-      INSTANCE = null;
       context.executor.shutdown();
+      forceScheduleCheck();
       try {
         context.executor.awaitTermination(1, TimeUnit.SECONDS);
         context.executor.shutdownNow();
+        INSTANCE = null;
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
     }
   }
 
+  /**
+   * Forces the poller thread to re-check schedules before the normal timeout happens.
+   */
   public static void forceScheduleCheck() {
     INSTANCE.forcedScheduleCheckCounter.incrementAndGet();
   }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java b/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
index a8fe0c3..dd8da34 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
@@ -113,7 +113,6 @@ public class TestScheduledQueryService {
 
     @Override
     public ScheduledQueryPollResponse scheduledQueryPoll() {
-
       ScheduledQueryPollResponse r = new ScheduledQueryPollResponse();
       r.setExecutionId(id++);
       r.setQuery(stmt);
@@ -154,8 +153,6 @@ public class TestScheduledQueryService {
     MockScheduledQueryService qService = new MockScheduledQueryService("insert into tu values(1),(2),(3),(4),(5)");
     ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService);
     try (ScheduledQueryExecutionService sQ = ScheduledQueryExecutionService.startScheduledQueryExecutorService(ctx)) {
-
-      executor.shutdown();
       // Wait for the scheduled query to finish. Hopefully 30 seconds should be more than enough.
       SessionState.getConsole().logInfo("Waiting for query execution to finish ...");
       synchronized (qService.notifier) {
@@ -163,7 +160,6 @@ public class TestScheduledQueryService {
       }
       SessionState.getConsole().logInfo("Done waiting for query execution!");
     }
-    executor.shutdownNow();
 
     assertThat(qService.lastProgressInfo.isSetExecutorQueryId(), is(true));
     assertThat(qService.lastProgressInfo.getExecutorQueryId(),
diff --git a/ql/src/test/queries/clientpositive/schq_analyze.q b/ql/src/test/queries/clientpositive/schq_analyze.q
index 969b47b..3c03360 100644
--- a/ql/src/test/queries/clientpositive/schq_analyze.q
+++ b/ql/src/test/queries/clientpositive/schq_analyze.q
@@ -21,7 +21,7 @@ create scheduled query t_analyze cron '0 */1 * * * ? *' as analyze table t compu
 
 alter scheduled query t_analyze execute;
 
-!sleep 3; 
+!sleep 10; 
  
 select * from information_schema.scheduled_executions s where schedule_name='ex_analyze' order by scheduled_execution_id desc limit 3;
  
diff --git a/ql/src/test/queries/clientpositive/schq_materialized.q b/ql/src/test/queries/clientpositive/schq_materialized.q
index 6baed49..9848f9f 100644
--- a/ql/src/test/queries/clientpositive/schq_materialized.q
+++ b/ql/src/test/queries/clientpositive/schq_materialized.q
@@ -71,7 +71,7 @@ select `(NEXT_EXECUTION|SCHEDULED_QUERY_ID)?+.+` from sys.scheduled_queries;
 
 alter scheduled query d execute;
 
-!sleep 3;
+!sleep 10;
 
 -- the scheduled execution will fail - because of missing TXN; but overall it works..
 select state,error_message from sys.scheduled_executions;
diff --git a/ql/src/test/results/clientpositive/llap/schq_materialized.q.out b/ql/src/test/results/clientpositive/llap/schq_materialized.q.out
index e904d46..194a1b3 100644
--- a/ql/src/test/results/clientpositive/llap/schq_materialized.q.out
+++ b/ql/src/test/results/clientpositive/llap/schq_materialized.q.out
@@ -277,7 +277,7 @@ POSTHOOK: query: select `(NEXT_EXECUTION|SCHEDULED_QUERY_ID)?+.+` from sys.sched
 POSTHOOK: type: QUERY
 POSTHOOK: Input: sys@scheduled_queries
 #### A masked pattern was here ####
-d	true	hive	0 0 * * * ? *	hive_admin_user	alter materialized view `default`.`mv1` rebuild
+d	true	hive	0 0 * * * ? *	hive_admin_user	alter materialized view `default`.`mv1` rebuild	NULL
 PREHOOK: query: alter scheduled query d execute
 PREHOOK: type: ALTER SCHEDULED QUERY
 POSTHOOK: query: alter scheduled query d execute
diff --git a/ql/src/test/results/clientpositive/llap/sysdb.q.out b/ql/src/test/results/clientpositive/llap/sysdb.q.out
index 38cadf3..8b0be82 100644
--- a/ql/src/test/results/clientpositive/llap/sysdb.q.out
+++ b/ql/src/test/results/clientpositive/llap/sysdb.q.out
@@ -813,6 +813,8 @@ scheduled_executions	start_time
 scheduled_executions	start_time
 scheduled_executions	state
 scheduled_executions	state
+scheduled_queries	active_execution_id
+scheduled_queries	active_execution_id
 scheduled_queries	cluster_namespace
 scheduled_queries	cluster_namespace
 scheduled_queries	enabled
@@ -1069,8 +1071,8 @@ POSTHOOK: Input: sys@columns_v2
 a	decimal(10,2)	0
 acquired_at	string	9
 action_expression	string	4
-add_time	int	1
-agent_info	string	6
+active_execution_id	bigint	8
+active_execution_id	bigint	8
 PREHOOK: query: select param_key, param_value from database_params order by param_key, param_value limit 5
 PREHOOK: type: QUERY
 PREHOOK: Input: sys@database_params
@@ -1419,9 +1421,9 @@ POSTHOOK: Input: sys@table_params
 #### A masked pattern was here ####
 COLUMN_STATS_ACCURATE	{"BASIC_STATS":"true","COLUMN_STATS":{"a":"true","b":"true","c":"true","d":"true","e":"true","f":"true","g":"true"}}
 COLUMN_STATS_ACCURATE	{"BASIC_STATS":"true","COLUMN_STATS":{"action_expression":"true","name":"true","ns":"true","rp_name":"true","trigger_expression":"true"}}
+COLUMN_STATS_ACCURATE	{"BASIC_STATS":"true","COLUMN_STATS":{"active_execution_id":"true","cluster_namespace":"true","enabled":"true","next_execution":"true","query":"true","schedule":"true","schedule_name":"true","scheduled_query_id":"true","user":"true"}}
 COLUMN_STATS_ACCURATE	{"BASIC_STATS":"true","COLUMN_STATS":{"add_time":"true","grant_option":"true","grantor":"true","grantor_type":"true","principal_name":"true","principal_type":"true","role_grant_id":"true","role_id":"true"}}
 COLUMN_STATS_ACCURATE	{"BASIC_STATS":"true","COLUMN_STATS":{"alloc_fraction":"true","ns":"true","path":"true","query_parallelism":"true","rp_name":"true","scheduling_policy":"true"}}
-COLUMN_STATS_ACCURATE	{"BASIC_STATS":"true","COLUMN_STATS":{"authorizer":"true","column_name":"true","create_time":"true","grant_option":"true","grantor":"true","grantor_type":"true","part_col_priv":"true","part_column_grant_id":"true","part_id":"true","principal_name":"true","principal_type":"true"}}
 PREHOOK: query: select tbl_name from tbls order by tbl_name limit 5
 PREHOOK: type: QUERY
 PREHOOK: Input: sys@tbls
@@ -1536,9 +1538,9 @@ POSTHOOK: Input: sys@table_stats_view
 #### A masked pattern was here ####
 {"BASIC_STATS":"true","COLUMN_STATS":{"a":"true","b":"true","c":"true","d":"true","e":"true","f":"true","g":"true"}}	0	0	0	0
 {"BASIC_STATS":"true","COLUMN_STATS":{"action_expression":"true","name":"true","ns":"true","rp_name":"true","trigger_expression":"true"}}	0	0	0	0
+{"BASIC_STATS":"true","COLUMN_STATS":{"active_execution_id":"true","cluster_namespace":"true","enabled":"true","next_execution":"true","query":"true","schedule":"true","schedule_name":"true","scheduled_query_id":"true","user":"true"}}	0	0	0	0
 {"BASIC_STATS":"true","COLUMN_STATS":{"add_time":"true","grant_option":"true","grantor":"true","grantor_type":"true","principal_name":"true","principal_type":"true","role_grant_id":"true","role_id":"true"}}	0	0	0	0
 {"BASIC_STATS":"true","COLUMN_STATS":{"alloc_fraction":"true","ns":"true","path":"true","query_parallelism":"true","rp_name":"true","scheduling_policy":"true"}}	0	0	0	0
-{"BASIC_STATS":"true","COLUMN_STATS":{"authorizer":"true","column_name":"true","create_time":"true","grant_option":"true","grantor":"true","grantor_type":"true","part_col_priv":"true","part_column_grant_id":"true","part_id":"true","principal_name":"true","principal_type":"true"}}	0	0	0	0
 PREHOOK: query: select COLUMN_STATS_ACCURATE, NUM_FILES, NUM_ROWS, RAW_DATA_SIZE, TOTAL_SIZE FROM PARTITION_STATS_VIEW where COLUMN_STATS_ACCURATE is not null order by NUM_FILES, NUM_ROWS, RAW_DATA_SIZE limit 5
 PREHOOK: type: QUERY
 PREHOOK: Input: sys@partition_params
diff --git a/ql/src/test/results/clientpositive/llap/sysdb_schq.q.out b/ql/src/test/results/clientpositive/llap/sysdb_schq.q.out
index 8745e3b..3239c36 100644
--- a/ql/src/test/results/clientpositive/llap/sysdb_schq.q.out
+++ b/ql/src/test/results/clientpositive/llap/sysdb_schq.q.out
@@ -27,6 +27,7 @@ schedule            	string              	from deserializer
 user                	string              	from deserializer   
 query               	string              	from deserializer   
 next_execution      	bigint              	from deserializer   
+active_execution_id 	bigint              	from deserializer   
 	 	 
 # Detailed Table Information	 	 
 Database:           	sys                 	 
@@ -35,7 +36,7 @@ Retention:          	0
 #### A masked pattern was here ####
 Table Type:         	EXTERNAL_TABLE      	 
 Table Parameters:	 	 
-	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"cluster_namespace\":\"true\",\"enabled\":\"true\",\"next_execution\":\"true\",\"query\":\"true\",\"schedule\":\"true\",\"schedule_name\":\"true\",\"scheduled_query_id\":\"true\",\"user\":\"true\"}}
+	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"active_execution_id\":\"true\",\"cluster_namespace\":\"true\",\"enabled\":\"true\",\"next_execution\":\"true\",\"query\":\"true\",\"schedule\":\"true\",\"schedule_name\":\"true\",\"scheduled_query_id\":\"true\",\"user\":\"true\"}}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
 	hive.sql.database.type	METASTORE           
@@ -47,7 +48,8 @@ Table Parameters:
 	                    	  \"SCHEDULE\",     
 	                    	  \"USER\",         
 	                    	  \"QUERY\",        
-	                    	  \"NEXT_EXECUTION\"
+	                    	  \"NEXT_EXECUTION\",
+	                    	  \"ACTIVE_EXECUTION_ID\"
 	                    	FROM                
 	                    	  \"SCHEDULED_QUERIES\"
 	numFiles            	0                   
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java
index e5d21b0..d56bc2a 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.metastore;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -35,4 +34,13 @@ public interface MetastoreTaskThread extends Configurable, Runnable {
    * @return frequency
    */
   long runFrequency(TimeUnit unit);
+
+  /**
+   * Gets the initial delay before the first execution.
+   *
+   * Defaults to {@link #runFrequency(TimeUnit)}
+   */
+  default long initialDelay(TimeUnit unit) {
+    return runFrequency(unit);
+  }
 }
diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java
deleted file mode 100644
index 2eb51c8..0000000
--- a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Package consisting the utility methods for metastore.
- */
-package org.apache.hadoop.hive.metastore.utils;
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 1a5944d..8a826d2 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -12667,7 +12667,7 @@ public class ObjectStore implements RawStore, Configurable {
     try {
       openTransaction();
       Query q = pm.newQuery(MScheduledQuery.class,
-          "nextExecution <= now && enabled && clusterNamespace == ns");
+          "nextExecution <= now && enabled && clusterNamespace == ns && activeExecution == null");
       q.setSerializeRead(true);
       q.declareParameters("java.lang.Integer now, java.lang.String ns");
       q.setOrdering("nextExecution");
@@ -12685,6 +12685,7 @@ public class ObjectStore implements RawStore, Configurable {
       execution.setState(QueryState.INITED);
       execution.setStartTime(now);
       execution.setLastUpdateTime(now);
+      schq.setActiveExecution(execution);
       pm.makePersistent(execution);
       pm.makePersistent(schq);
       ObjectStoreTestHook.onScheduledQueryPoll();
@@ -12735,6 +12736,7 @@ public class ObjectStore implements RawStore, Configurable {
       case TIMED_OUT:
         execution.setEndTime((int) (System.currentTimeMillis() / 1000));
         execution.setLastUpdateTime(null);
+        execution.getScheduledQuery().setActiveExecution(null);
         break;
       default:
         throw new InvalidOperationException("invalid state: " + info.getState());
@@ -12967,6 +12969,8 @@ public class ObjectStore implements RawStore, Configurable {
         //        info.set
         scheduledQueryProgress(info);
       }
+
+      recoverInvalidScheduledQueryState(timeoutSecs);
       committed = commitTransaction();
       return results.size();
     } finally {
@@ -12975,4 +12979,21 @@ public class ObjectStore implements RawStore, Configurable {
       }
     }
   }
+
+  private void recoverInvalidScheduledQueryState(int timeoutSecs) {
+    int maxLastUpdateTime = (int) (System.currentTimeMillis() / 1000) - timeoutSecs;
+    Query q = pm.newQuery(MScheduledQuery.class);
+    q.setFilter("activeExecution != null");
+
+    List<MScheduledQuery> results = (List<MScheduledQuery>) q.execute();
+    for (MScheduledQuery e : results) {
+      Integer lastUpdateTime = e.getActiveExecution().getLastUpdateTime();
+      if (lastUpdateTime == null || lastUpdateTime < maxLastUpdateTime) {
+        LOG.error("Scheduled query: {} stuck with an activeExecution - clearing",
+            scheduledQueryKeyRef(e.getScheduleKey()));
+        e.setActiveExecution(null);
+        pm.makePersistent(e);
+      }
+    }
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ScheduledQueryExecutionsMaintTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ScheduledQueryExecutionsMaintTask.java
index d678d01..4c1b34d 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ScheduledQueryExecutionsMaintTask.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ScheduledQueryExecutionsMaintTask.java
@@ -35,6 +35,13 @@ public class ScheduledQueryExecutionsMaintTask implements MetastoreTaskThread {
   private Configuration conf;
 
   @Override
+  public long initialDelay(TimeUnit unit) {
+    // no delay before the first execution;
+    // after an ungracefull shutdown it might take time to notice that in-flight scheduled queries are not running anymore
+    return 0;
+  }
+
+  @Override
   public long runFrequency(TimeUnit unit) {
     return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.SCHEDULED_QUERIES_EXECUTION_MAINT_TASK_FREQUENCY,
         unit);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MScheduledQuery.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MScheduledQuery.java
index d055e7d..c80241b 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MScheduledQuery.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MScheduledQuery.java
@@ -35,6 +35,7 @@ public class MScheduledQuery {
   private String user;
   private String query;
   private Integer nextExecution;
+  private MScheduledExecution activeExecution;
   private Set<MScheduledExecution> executions;
 
   public MScheduledQuery(ScheduledQuery s) {
@@ -112,4 +113,12 @@ public class MScheduledQuery {
     return user;
   }
 
+  public void setActiveExecution(MScheduledExecution execution) {
+    activeExecution = execution;
+  }
+
+  public MScheduledExecution getActiveExecution() {
+    return activeExecution;
+  }
+
 }
diff --git a/standalone-metastore/metastore-server/src/main/resources/package.jdo b/standalone-metastore/metastore-server/src/main/resources/package.jdo
index ce919e4..88eabfa 100644
--- a/standalone-metastore/metastore-server/src/main/resources/package.jdo
+++ b/standalone-metastore/metastore-server/src/main/resources/package.jdo
@@ -1471,6 +1471,10 @@
       <field name="nextExecution">
         <column name="NEXT_EXECUTION" jdbc-type="integer" allows-null="true"/>
       </field>
+      <field name="activeExecution">
+        <column name="ACTIVE_EXECUTION_ID" allows-null="true"/>
+        <foreign-key name="SCHEDULED_EXECUTIONS_SCHQ_ACTIVE" delete-action="cascade"/>
+      </field>
       <field name="executions" mapped-by="scheduledQuery" dependent-element="true">
         <collection element-type="MScheduledExecution" dependent-element="true" />
         <foreign-key name="SCHEDULED_EXECUTIONS_SCHQ_FK" delete-action="cascade"/>
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index 29b20e4..48ad676 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -749,7 +749,8 @@ CREATE TABLE "APP"."SCHEDULED_QUERIES" (
   "USER" varchar(128) not null,
   "SCHEDULE" varchar(256) not null,
   "QUERY" varchar(4000) not null,
-  "NEXT_EXECUTION" integer not null
+  "NEXT_EXECUTION" integer not null,
+  "ACTIVE_EXECUTION_ID" bigint
 );
 
 CREATE INDEX NEXTEXECUTIONINDEX ON APP.SCHEDULED_QUERIES (ENABLED,CLUSTER_NAMESPACE,NEXT_EXECUTION);
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
index e8fe6a2..7a230bd 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
@@ -58,6 +58,8 @@ ALTER TABLE "APP"."KEY_CONSTRAINTS" ADD CONSTRAINT "CONSTRAINTS_PK" PRIMARY KEY
 
 -- HIVE-21487
 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
+-- HIVE-22872
+ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint;
 
 -- This needs to be the last thing done.  Insert any changes above this line.
 UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
\ No newline at end of file
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index 955a94b..a2cf981 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1293,6 +1293,35 @@ ALTER TABLE TXN_WRITE_NOTIFICATION_LOG ADD CONSTRAINT TXN_WRITE_NOTIFICATION_LOG
 
 INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
 
+CREATE TABLE "SCHEDULED_QUERIES" (
+	"SCHEDULED_QUERY_ID"  bigint NOT NULL,
+	"CLUSTER_NAMESPACE" VARCHAR(256),
+	"ENABLED" bit NOT NULL DEFAULT 0,
+	"NEXT_EXECUTION" INTEGER,
+	"QUERY" VARCHAR(4000),
+	"SCHEDULE" VARCHAR(256),
+	"SCHEDULE_NAME" VARCHAR(256),
+	"USER" VARCHAR(256),
+	"ACTIVE_EXECUTION_ID" bigint,
+	CONSTRAINT SCHEDULED_QUERIES_PK PRIMARY KEY ("SCHEDULED_QUERY_ID")
+);
+
+CREATE TABLE "SCHEDULED_EXECUTIONS" (
+	"SCHEDULED_EXECUTION_ID" bigint NOT NULL,
+	"END_TIME" INTEGER,
+	"ERROR_MESSAGE" VARCHAR(2000),
+	"EXECUTOR_QUERY_ID" VARCHAR(256),
+	"LAST_UPDATE_TIME" INTEGER,
+	"SCHEDULED_QUERY_ID" bigint,
+	"START_TIME" INTEGER,
+	"STATE" VARCHAR(256),
+	CONSTRAINT SCHEDULED_EXECUTIONS_PK PRIMARY KEY ("SCHEDULED_EXECUTION_ID"),
+	CONSTRAINT SCHEDULED_EXECUTIONS_SCHQ_FK FOREIGN KEY ("SCHEDULED_QUERY_ID") REFERENCES "SCHEDULED_QUERIES"("SCHEDULED_QUERY_ID") ON DELETE CASCADE
+);
+
+CREATE INDEX IDX_SCHEDULED_EX_LAST_UPDATE ON "SCHEDULED_EXECUTIONS" ("LAST_UPDATE_TIME");
+CREATE INDEX IDX_SCHEDULED_EX_SQ_ID ON "SCHEDULED_EXECUTIONS" ("SCHEDULED_QUERY_ID");
+
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script
 -- -----------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
index a554f8a..4a58770 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
@@ -33,6 +33,36 @@ ALTER TABLE KEY_CONSTRAINTS ADD CONSTRAINT CONSTRAINTS_PK PRIMARY KEY (PARENT_TB
 -- HIVE-21487
 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
 
+CREATE TABLE "SCHEDULED_QUERIES" (
+	"SCHEDULED_QUERY_ID"  bigint NOT NULL,
+	"CLUSTER_NAMESPACE" VARCHAR(256),
+	"ENABLED" bit NOT NULL DEFAULT 0,
+	"NEXT_EXECUTION" INTEGER,
+	"QUERY" VARCHAR(4000),
+	"SCHEDULE" VARCHAR(256),
+	"SCHEDULE_NAME" VARCHAR(256),
+	"USER" VARCHAR(256),
+	"ACTIVE_EXECUTION_ID" bigint,
+	CONSTRAINT SCHEDULED_QUERIES_PK PRIMARY KEY ("SCHEDULED_QUERY_ID")
+);
+
+CREATE TABLE "SCHEDULED_EXECUTIONS" (
+	"SCHEDULED_EXECUTION_ID" bigint NOT NULL,
+	"END_TIME" INTEGER,
+	"ERROR_MESSAGE" VARCHAR(2000),
+	"EXECUTOR_QUERY_ID" VARCHAR(256),
+	"LAST_UPDATE_TIME" INTEGER,
+	"SCHEDULED_QUERY_ID" bigint,
+	"START_TIME" INTEGER,
+	"STATE" VARCHAR(256),
+	CONSTRAINT SCHEDULED_EXECUTIONS_PK PRIMARY KEY ("SCHEDULED_EXECUTION_ID"),
+	CONSTRAINT SCHEDULED_EXECUTIONS_SCHQ_FK FOREIGN KEY ("SCHEDULED_QUERY_ID") REFERENCES "SCHEDULED_QUERIES"("SCHEDULED_QUERY_ID") ON DELETE CASCADE
+);
+
+CREATE INDEX IDX_SCHEDULED_EX_LAST_UPDATE ON "SCHEDULED_EXECUTIONS" ("LAST_UPDATE_TIME");
+CREATE INDEX IDX_SCHEDULED_EX_SQ_ID ON "SCHEDULED_EXECUTIONS" ("SCHEDULED_QUERY_ID");
+
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 63c97e6..bc34b51 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -1224,6 +1224,7 @@ CREATE TABLE SCHEDULED_QUERIES (
 	SCHEDULE VARCHAR(256),
 	SCHEDULE_NAME VARCHAR(256),
 	`USER` VARCHAR(256),
+	ACTIVE_EXECUTION_ID INTEGER,
 	CONSTRAINT SCHEDULED_QUERIES_PK PRIMARY KEY (SCHEDULED_QUERY_ID)
 );
 
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
index c175d4c..13f03bc 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
@@ -62,6 +62,8 @@ ALTER TABLE `KEY_CONSTRAINTS` ADD CONSTRAINT `CONSTRAINTS_PK` PRIMARY KEY (`PARE
 
 -- HIVE-21487
 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
+-- HIVE-22872
+ALTER TABLE SCHEDULED_QUERIES ADD COLUMN ACTIVE_EXECUTION_ID INTEGER ;
 
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index 4338d9c..8482b59 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -1199,6 +1199,7 @@ CREATE TABLE "SCHEDULED_QUERIES" (
 	"SCHEDULE" VARCHAR(256),
 	"SCHEDULE_NAME" VARCHAR(256),
 	"USER" VARCHAR(256),
+	"ACTIVE_EXECUTION_ID" number(19),
 	CONSTRAINT SCHEDULED_QUERIES_PK PRIMARY KEY ("SCHEDULED_QUERY_ID")
 );
 
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
index cec7f53..cbfdd86 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
@@ -62,6 +62,8 @@ ALTER TABLE KEY_CONSTRAINTS ADD CONSTRAINT CONSTRAINTS_PK PRIMARY KEY (PARENT_TB
 
 -- HIVE-21487
 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
+-- HIVE-22872
+ALTER TABLE SCHEDULED_QUERIES ADD ACTIVE_EXECUTION_ID number(19);
 
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index e78fff1..aa35a7a 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1887,6 +1887,7 @@ CREATE TABLE "SCHEDULED_QUERIES" (
     "SCHEDULE" VARCHAR(256),
     "SCHEDULE_NAME" VARCHAR(256),
     "USER" VARCHAR(256),
+    "ACTIVE_EXECUTION_ID" BIGINT,
     CONSTRAINT "SCHEDULED_QUERIES_PK" PRIMARY KEY ("SCHEDULED_QUERY_ID")
 );
 
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
index 52953f0..9462328 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
@@ -193,6 +193,8 @@ ALTER TABLE "KEY_CONSTRAINTS" ADD CONSTRAINT "CONSTRAINTS_PK" PRIMARY KEY ("PARE
 
 -- HIVE-21487
 CREATE INDEX "COMPLETED_COMPACTIONS_RES" ON "COMPLETED_COMPACTIONS" ("CC_DATABASE","CC_TABLE","CC_PARTITION");
+-- HIVE-22872
+ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint;
 
 -- These lines need to be last. Insert any changes above.
 UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1;